Skip to content
This repository has been archived by the owner on Dec 21, 2023. It is now read-only.

Commit

Permalink
fix(shipyard-controller): Avoid duplicate .triggered events (#5608) (#…
Browse files Browse the repository at this point in the history
…5612)

* fix(shipyard-controller): Avoid duplicate .triggered events (#5608)

Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>

* fix(shipyard-controller): Avoid duplicate .triggered events (#5608)

Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>

* fix(shipyard-controller): Avoid duplicate .triggered events (#5608)

Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>

* fix(shipyard-controller): Avoid duplicate .triggered events (#5608)

Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>

* extended integration test

Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>

* added log output to integration test

Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>

* added lock to sequence dispatcher

Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>
  • Loading branch information
bacherfl committed Oct 12, 2021
1 parent 449dfc5 commit e4c4a25
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 44 deletions.
22 changes: 14 additions & 8 deletions shipyard-controller/handler/fake/sequencedispatcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions shipyard-controller/handler/fake/shipyardcontroller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions shipyard-controller/handler/projectmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func NewProjectManager(
}

func (pm *ProjectManager) Get() ([]*models.ExpandedProject, error) {
log.Info("Getting all projects")
allProjects, err := pm.ProjectMaterializedView.GetProjects()
if err != nil {
return nil, err
Expand All @@ -73,7 +72,6 @@ func (pm *ProjectManager) Get() ([]*models.ExpandedProject, error) {
}

func (pm *ProjectManager) GetByName(projectName string) (*models.ExpandedProject, error) {
log.Infof("Getting project with name %s", projectName)
project, err := pm.ProjectMaterializedView.GetProject(projectName)
if err != nil {
return nil, err
Expand Down
52 changes: 35 additions & 17 deletions shipyard-controller/handler/sequencedispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,37 @@ package handler

import (
"context"
"errors"
"fmt"
"github.com/benbjohnson/clock"
keptnv2 "github.com/keptn/go-utils/pkg/lib/v0_2_0"
"github.com/keptn/keptn/shipyard-controller/common"
"github.com/keptn/keptn/shipyard-controller/db"
"github.com/keptn/keptn/shipyard-controller/models"
log "github.com/sirupsen/logrus"
"sync"
"time"
)

var errSequenceBlocked = errors.New("sequence is currently blocked")

//go:generate moq -pkg fake -skip-ensure -out ./fake/sequencedispatcher.go . ISequenceDispatcher
// ISequenceDispatcher is responsible for dispatching events to be sent to the event broker
type ISequenceDispatcher interface {
Add(queueItem models.QueueItem) error
Run(ctx context.Context)
Run(ctx context.Context, startSequenceFunc func(event models.Event) error)
}

type SequenceDispatcher struct {
eventRepo db.EventRepo
eventQueueRepo db.EventQueueRepo
sequenceQueue db.SequenceQueueRepo
sequenceRepo db.TaskSequenceRepo
theClock clock.Clock
syncInterval time.Duration
eventChannel chan models.Event
eventRepo db.EventRepo
eventQueueRepo db.EventQueueRepo
sequenceQueue db.SequenceQueueRepo
sequenceRepo db.TaskSequenceRepo
theClock clock.Clock
syncInterval time.Duration
startSequenceFunc func(event models.Event) error
shipyardController shipyardController
mutex sync.Mutex
}

// NewSequenceDispatcher creates a new SequenceDispatcher
Expand All @@ -36,7 +42,6 @@ func NewSequenceDispatcher(
sequenceQueueRepo db.SequenceQueueRepo,
sequenceRepo db.TaskSequenceRepo,
syncInterval time.Duration,
eventChannel chan models.Event,
theClock clock.Clock,

) ISequenceDispatcher {
Expand All @@ -47,19 +52,28 @@ func NewSequenceDispatcher(
sequenceRepo: sequenceRepo,
theClock: theClock,
syncInterval: syncInterval,
eventChannel: eventChannel,
mutex: sync.Mutex{},
}
}

func (sd *SequenceDispatcher) Add(queueItem models.QueueItem) error {
if err := sd.sequenceQueue.QueueSequence(queueItem); err != nil {
return err
// try to dispatch the sequence immediately
if err := sd.dispatchSequence(queueItem); err != nil {
if err == errSequenceBlocked {
// if the sequence is currently blocked, insert it into the queue
if err2 := sd.sequenceQueue.QueueSequence(queueItem); err2 != nil {
return err2
}
} else {
return err
}
}
return sd.dispatchSequence(queueItem)
return nil
}

func (sd *SequenceDispatcher) Run(ctx context.Context) {
func (sd *SequenceDispatcher) Run(ctx context.Context, startSequenceFunc func(event models.Event) error) {
ticker := sd.theClock.Ticker(sd.syncInterval)
sd.startSequenceFunc = startSequenceFunc
go func() {
for {
select {
Expand Down Expand Up @@ -93,10 +107,12 @@ func (sd *SequenceDispatcher) dispatchSequences() {
}

func (sd *SequenceDispatcher) dispatchSequence(queuedSequence models.QueueItem) error {
sd.mutex.Lock()
defer sd.mutex.Unlock()
// first, check if the sequence is currently paused
if sd.eventQueueRepo.IsSequenceOfEventPaused(queuedSequence.Scope) {
log.Infof("Sequence %s is currently paused. Will not start it yet.", queuedSequence.Scope.KeptnContext)
return nil
return errSequenceBlocked
}
// fetch all sequences that are currently running in the stage of the project where the sequence should run
runningSequencesInStage, err := sd.sequenceRepo.GetTaskSequences(queuedSequence.Scope.Project, models.TaskSequenceEvent{
Expand All @@ -110,7 +126,7 @@ func (sd *SequenceDispatcher) dispatchSequence(queuedSequence models.QueueItem)
// if there is a sequence running in the stage, we cannot trigger this sequence yet
if sd.areActiveSequencesBlockingQueuedSequences(runningSequencesInStage) {
log.Infof("sequence %s cannot be started yet because sequences are still running in stage %s", queuedSequence.Scope.KeptnContext, queuedSequence.Scope.Stage)
return nil
return errSequenceBlocked
}

events, err := sd.eventRepo.GetEvents(queuedSequence.Scope.Project, common.EventFilter{
Expand All @@ -127,7 +143,9 @@ func (sd *SequenceDispatcher) dispatchSequence(queuedSequence models.QueueItem)

sequenceTriggeredEvent := events[0]

sd.eventChannel <- sequenceTriggeredEvent
if err := sd.startSequenceFunc(sequenceTriggeredEvent); err != nil {
return fmt.Errorf("could not start task sequence %s: %s", queuedSequence.EventID, err.Error())
}

return sd.sequenceQueue.DeleteQueuedSequences(queuedSequence)
}
Expand Down
23 changes: 12 additions & 11 deletions shipyard-controller/handler/sequencedispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
func TestSequenceDispatcher(t *testing.T) {
theClock := clock.NewMock()

startSequenceChannel := make(chan models.Event)

startSequenceCalls := []models.Event{}
triggeredEvents := []models.Event{
{
Expand All @@ -34,13 +32,6 @@ func TestSequenceDispatcher(t *testing.T) {
currentTaskSequences := []models.TaskSequenceEvent{}
mockQueue := []models.QueueItem{}

go func() {
for {
startSequenceCall := <-startSequenceChannel
startSequenceCalls = append(startSequenceCalls, startSequenceCall)
}
}()

mockEventRepo := &db_mock.EventRepoMock{
GetEventsFunc: func(project string, filter common.EventFilter, status ...common.EventStatus) ([]models.Event, error) {
return triggeredEvents, nil
Expand Down Expand Up @@ -77,8 +68,12 @@ func TestSequenceDispatcher(t *testing.T) {
},
}

sequenceDispatcher := handler.NewSequenceDispatcher(mockEventRepo, mockEventQueueRepo, mockSequenceQueueRepo, mockTaskSequenceRepo, 10*time.Second, startSequenceChannel, theClock)
sequenceDispatcher.Run(context.Background())
sequenceDispatcher := handler.NewSequenceDispatcher(mockEventRepo, mockEventQueueRepo, mockSequenceQueueRepo, mockTaskSequenceRepo, 10*time.Second, theClock)

sequenceDispatcher.Run(context.Background(), func(event models.Event) error {
startSequenceCalls = append(startSequenceCalls, event)
return nil
})

// check if repos are queried
theClock.Add(11 * time.Second)
Expand Down Expand Up @@ -112,6 +107,9 @@ func TestSequenceDispatcher(t *testing.T) {
require.Equal(t, *mockEventRepo.GetEventsCalls()[0].Filter.ID, queueItem.EventID)
require.Equal(t, mockEventRepo.GetEventsCalls()[0].Status[0], common.TriggeredEvent)

// if the sequence has been dispatched immediately, we do not need to insert it into the queue
require.Empty(t, mockSequenceQueueRepo.QueueSequenceCalls())

// has the queueItem been removed properly?
require.Len(t, mockSequenceQueueRepo.DeleteQueuedSequencesCalls(), 1)
require.Equal(t, queueItem, mockSequenceQueueRepo.DeleteQueuedSequencesCalls()[0].ItemFilter)
Expand Down Expand Up @@ -159,4 +157,7 @@ func TestSequenceDispatcher(t *testing.T) {
// GetEvents and DeleteQueuedSequences should not have been called again at this point
require.Len(t, mockEventRepo.GetEventsCalls(), 1)
require.Len(t, mockSequenceQueueRepo.DeleteQueuedSequencesCalls(), 1)

// item should have been added to queue
require.Len(t, mockSequenceQueueRepo.QueueSequenceCalls(), 1)
}
7 changes: 4 additions & 3 deletions shipyard-controller/handler/shipyardcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type IShipyardController interface {
GetTriggeredEventsOfProject(project string, filter common.EventFilter) ([]models.Event, error)
HandleIncomingEvent(event models.Event, waitForCompletion bool) error
ControlSequence(controlSequence common.SequenceControl) error
StartTaskSequence(event models.Event) error
}

type shipyardController struct {
Expand Down Expand Up @@ -90,7 +91,7 @@ func (sc *shipyardController) registerToChannels(ctx context.Context) {
log.Infof("stop listening to channels")
return
case startSequenceEvent := <-sc.startSequenceChan:
err := sc.startTaskSequence(startSequenceEvent)
err := sc.StartTaskSequence(startSequenceEvent)
if err != nil {
log.WithError(err).Error("could not start task sequence")
}
Expand Down Expand Up @@ -508,7 +509,7 @@ func (sc *shipyardController) handleTriggeredEvent(event models.Event) error {
Timestamp: time.Now().UTC(),
})
}
return sc.startTaskSequence(event)
return sc.StartTaskSequence(event)
}

func (sc *shipyardController) onTriggerSequenceFailed(event models.Event, eventScope *models.EventScope, msg string, taskSequenceName string) error {
Expand All @@ -534,7 +535,7 @@ func (sc *shipyardController) onTriggerSequenceFailed(event models.Event, eventS
}, taskSequenceName, event.ID)
}

func (sc *shipyardController) startTaskSequence(event models.Event) error {
func (sc *shipyardController) StartTaskSequence(event models.Event) error {
eventScope, err := models.NewEventScope(event)
if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions shipyard-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func main() {
createSequenceQueueRepo(),
createTaskSequenceRepo(),
getDurationFromEnvVar(envVarSequenceDispatchIntervalSec, envVarSequenceDispatchIntervalSecDefault),
sequenceDispatcherChannel,
clock.New(),
)

Expand All @@ -118,7 +117,7 @@ func main() {
sequenceTimeoutChannel,
sequenceControlChannel,
)
sequenceDispatcher.Run(context.Background())
sequenceDispatcher.Run(context.Background(), shipyardController.StartTaskSequence)

engine := gin.Default()
apiV1 := engine.Group("/v1")
Expand Down
1 change: 0 additions & 1 deletion test/go-tests/sequencecontrol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ func Test_SequenceControl_Abort(t *testing.T) {
VerifySequenceEndsUpInState(t, projectName, &models.EventContext{&keptnContextID}, 2*time.Minute, []string{scmodels.SequenceFinished})

require.Nil(t, err)

}

func Test_SequenceControl_PauseAndResume(t *testing.T) {
Expand Down

0 comments on commit e4c4a25

Please sign in to comment.