Skip to content
This repository has been archived by the owner on Dec 21, 2023. It is now read-only.

Commit

Permalink
feat(go-utils): Added retry logic to cp-connector for contacting Kept…
Browse files Browse the repository at this point in the history
…n's control plane for registration and renewal of such (#503)

* feat(go-utils): Added retry logic for contacting Keptn's control plane for registration and renewal of such

Signed-off-by: warber <bernd.warmuth@dynatrace.com>

* cleanup

Signed-off-by: warber <bernd.warmuth@dynatrace.com>

* use buffered err channel

Signed-off-by: warber <bernd.warmuth@dynatrace.com>

* added connection loggers to nats

Signed-off-by: warber <bernd.warmuth@dynatrace.com>

* cleanup

Signed-off-by: warber <bernd.warmuth@dynatrace.com>
  • Loading branch information
warber committed Jul 12, 2022
1 parent 488b356 commit 69c90ea
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pkg/sdk/connector/controlplane/controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func New(subscriptionSource subscriptionsource.SubscriptionSource, eventSource e
func (cp *ControlPlane) Register(ctx context.Context, integration Integration) error {
eventUpdates := make(chan types.EventUpdate)
subscriptionUpdates := make(chan []models.EventSubscription)
errC := make(chan error)
errC := make(chan error, 1)

var err error
registrationData := integration.RegistrationData()
Expand Down
12 changes: 11 additions & 1 deletion pkg/sdk/connector/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,18 @@ func NewFromEnv() *NatsConnector {
func (nc *NatsConnector) ensureConnection() (*nats.Conn, error) {

if !nc.connection.IsConnected() {
disconnectLogger := func(con *nats.Conn, err error) {
if err != nil {
nc.logger.Errorf("Disconnected from NATS due to an error: %v", err)
} else {
nc.logger.Info("Disconnected from NATS")
}
}
reconnectLogger := func(*nats.Conn) {
nc.logger.Info("Reconnected to NATS")
}
var err error
nc.connection, err = nats.Connect(nc.connectURL, nats.MaxReconnects(-1))
nc.connection, err = nats.Connect(nc.connectURL, nats.MaxReconnects(-1), nats.ReconnectHandler(reconnectLogger), nats.DisconnectErrHandler(disconnectLogger))

if err != nil {
return nil, fmt.Errorf("could not connect to NATS: %w", err)
Expand Down
92 changes: 74 additions & 18 deletions pkg/sdk/connector/subscriptionsource/subscriptionsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package subscriptionsource

import (
"context"
"errors"
"github.com/avast/retry-go"
"github.com/keptn/go-utils/pkg/sdk/connector/types"
"sync"
"time"
Expand All @@ -12,22 +14,40 @@ import (
"github.com/keptn/go-utils/pkg/sdk/connector/logger"
)

// ErrMaxPingRetriesExceeded is used when the subscription source was not able to
// contact Keptn's control plane for a certain amount of attempts
var ErrMaxPingRetriesExceeded = errors.New("maximum retries for polling event api exceeded")

type SubscriptionSource interface {
Start(context.Context, types.RegistrationData, chan []models.EventSubscription, chan error, *sync.WaitGroup) error
Register(integration models.Integration) (string, error)
Stop() error
}

const (
// DefaultFetchInterval is the time to wait between trying to pull subscription updates
// from Keptn's control plane.
DefaultFetchInterval = time.Second * 5
// DefaultMaxPingAttempts is the default number of times we try to contact Keptn's control plane
// for renewing the registration.
DefaultMaxPingAttempts = 10
// DefaultPingAttemptsInterval is the default wait time between subsequent tries to contact Keptn's control plane
// for renewing the registration.
DefaultPingAttemptsInterval = time.Second * 3
)

var _ SubscriptionSource = FixedSubscriptionSource{}
var _ SubscriptionSource = (*UniformSubscriptionSource)(nil)

// UniformSubscriptionSource represents a source for uniform subscriptions
type UniformSubscriptionSource struct {
uniformAPI api.UniformV1Interface
clock clock.Clock
fetchInterval time.Duration
logger logger.Logger
quitC chan struct{}
uniformAPI api.UniformV1Interface
clock clock.Clock
fetchInterval time.Duration
logger logger.Logger
quitC chan struct{}
maxPingAttempts uint
pingAttemptsInterval time.Duration
}

func (s *UniformSubscriptionSource) Register(integration models.Integration) (string, error) {
Expand All @@ -53,14 +73,33 @@ func WithLogger(logger logger.Logger) func(s *UniformSubscriptionSource) {
}
}

// WithMaxPingAttempts sets the max number of attempts of retrying to ping Keptn's
// control plane for renewing the registration
func WithMaxPingAttempts(maxPingAttempts uint) func(s *UniformSubscriptionSource) {
return func(s *UniformSubscriptionSource) {
s.maxPingAttempts = maxPingAttempts
}
}

// WithPingAttemptsInterval sets the time between subsequent tries to ping Keptn's control plane for renewing
// the registration
func WithPingAttemptsInterval(duration time.Duration) func(s *UniformSubscriptionSource) {
return func(s *UniformSubscriptionSource) {
s.pingAttemptsInterval = duration
}
}

// New creates a new UniformSubscriptionSource
func New(uniformAPI api.UniformV1Interface, options ...func(source *UniformSubscriptionSource)) *UniformSubscriptionSource {
s := &UniformSubscriptionSource{
uniformAPI: uniformAPI,
clock: clock.New(),
fetchInterval: time.Second * 5,
quitC: make(chan struct{}, 1),
logger: logger.NewDefaultLogger()}
uniformAPI: uniformAPI,
clock: clock.New(),
fetchInterval: DefaultFetchInterval,
quitC: make(chan struct{}, 1),
logger: logger.NewDefaultLogger(),
maxPingAttempts: DefaultMaxPingAttempts,
pingAttemptsInterval: DefaultPingAttemptsInterval}

for _, o := range options {
o(s)
}
Expand All @@ -71,15 +110,31 @@ func New(uniformAPI api.UniformV1Interface, options ...func(source *UniformSubsc
func (s *UniformSubscriptionSource) Start(ctx context.Context, registrationData types.RegistrationData, subscriptionChannel chan []models.EventSubscription, errC chan error, wg *sync.WaitGroup) error {
s.logger.Debugf("UniformSubscriptionSource: Starting to fetch subscriptions for Integration ID %s", registrationData.ID)
ticker := s.clock.Ticker(s.fetchInterval)

go func() {
s.ping(registrationData.ID, subscriptionChannel)
if err := retry.Do(func() error {
return s.ping(registrationData.ID, subscriptionChannel)
}, retry.Attempts(s.maxPingAttempts), retry.Delay(s.pingAttemptsInterval), retry.DelayType(retry.FixedDelay)); err != nil {
s.logger.Errorf("Reached max number of attempts to ping control plane")
errC <- ErrMaxPingRetriesExceeded
wg.Done()
return
}

for {
select {
case <-ctx.Done():
wg.Done()
return
case <-ticker.C:
s.ping(registrationData.ID, subscriptionChannel)
if err := retry.Do(func() error {
return s.ping(registrationData.ID, subscriptionChannel)
}, retry.Attempts(s.maxPingAttempts), retry.Delay(s.pingAttemptsInterval), retry.DelayType(retry.FixedDelay)); err != nil {
s.logger.Errorf("Reached max number of attempts to ping control plane")
errC <- ErrMaxPingRetriesExceeded
wg.Done()
return
}
case <-s.quitC:
wg.Done()
return
Expand All @@ -94,15 +149,16 @@ func (s *UniformSubscriptionSource) Stop() error {
return nil
}

func (s *UniformSubscriptionSource) ping(registrationId string, subscriptionChannel chan []models.EventSubscription) {
s.logger.Debugf("UniformSubscriptionSource: Renewing Integration ID %s", registrationId)
updatedIntegrationData, err := s.uniformAPI.Ping(registrationId)
func (s *UniformSubscriptionSource) ping(registrationID string, subscriptionC chan []models.EventSubscription) error {
s.logger.Debugf("UniformSubscriptionSource: Renewing Integration ID %s", registrationID)
updatedIntegrationData, err := s.uniformAPI.Ping(registrationID)
if err != nil {
s.logger.Errorf("Unable to ping control plane: %v", err)
return
return err
}
s.logger.Debugf("UniformSubscriptionSource: Ping successful, got %d subscriptions for %s", len(updatedIntegrationData.Subscriptions), registrationId)
subscriptionChannel <- updatedIntegrationData.Subscriptions
s.logger.Debugf("UniformSubscriptionSource: Ping successful, got %d subscriptions for %s", len(updatedIntegrationData.Subscriptions), registrationID)
subscriptionC <- updatedIntegrationData.Subscriptions
return nil
}

// FixedSubscriptionSource can be used to use a fixed list of subscriptions rather than
Expand Down
49 changes: 45 additions & 4 deletions pkg/sdk/connector/subscriptionsource/subscriptionsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,68 @@ import (
"github.com/stretchr/testify/require"
)

func TestSubscriptionSourceCPPingFails(t *testing.T) {
func TestNumberOfFailedInitialTriesToPingExceedMaxAllowedAttempts(t *testing.T) {
initialRegistrationData := types.RegistrationData{}

uniformInterface := &fake.UniformAPIMock{
PingFn: func(s string) (*models.Integration, error) {
return nil, fmt.Errorf("error occured")
}}
subscriptionUpdates := make(chan []models.EventSubscription)
// this is the "consumer" of the subscription updates received via ping
// we don't expect to receive any update since every call fails
go func() {
<-subscriptionUpdates
require.FailNow(t, "got subscription event via channel")
}()

subscriptionSource := New(uniformInterface)
subscriptionSource := New(uniformInterface, WithMaxPingAttempts(2), WithPingAttemptsInterval(10*time.Millisecond))
clock := clock.NewMock()
subscriptionSource.clock = clock
wg := &sync.WaitGroup{}
wg.Add(1)
err := subscriptionSource.Start(context.TODO(), initialRegistrationData, subscriptionUpdates, make(chan error), wg)
errC := make(chan error)
err := subscriptionSource.Start(context.TODO(), initialRegistrationData, subscriptionUpdates, errC, wg)
require.NoError(t, err)
clock.Add(5 * time.Second)
// expect error via this channel
<-errC
// expect subscription source to finish
wg.Wait()
}

func TestNumberOfFailedSubsequentTriesToPingsExceedMaxAllowedAttempts(t *testing.T) {
initialRegistrationData := types.RegistrationData{}

pingCalled := 0
uniformInterface := &fake.UniformAPIMock{
PingFn: func(s string) (*models.Integration, error) {
pingCalled++
// simulate that subsequent pings will fail
if pingCalled > 1 {
return nil, fmt.Errorf("error occured")
}
return &models.Integration{}, nil
}}

subscriptionUpdates := make(chan []models.EventSubscription)
// this is the "consumer" of the results of the first (successful)
// subscription attempts
go func() { <-subscriptionUpdates }()

subscriptionSource := New(uniformInterface, WithMaxPingAttempts(2), WithPingAttemptsInterval(10*time.Millisecond))
clock := clock.NewMock()
subscriptionSource.clock = clock
wg := &sync.WaitGroup{}
wg.Add(1)
errC := make(chan error)
err := subscriptionSource.Start(context.TODO(), initialRegistrationData, subscriptionUpdates, errC, wg)
require.NoError(t, err)
clock.Add(5 * time.Second) // fetch interval
clock.Add(5 * time.Second) // another fetch interval
// expect error via this channel
<-errC
// expect subscription source to finish
wg.Wait()
}

func TestSubscriptionSourceWithFetchInterval(t *testing.T) {
Expand Down

0 comments on commit 69c90ea

Please sign in to comment.