From 1bc93f339b43f82c1735d59041f9358837f93ae5 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 3 May 2022 16:25:30 +0200 Subject: [PATCH] feat(shipyard-controller): Store sequence executions in new format without potential dots (.) in property names (#7605) * introduce db model for sequence executions Signed-off-by: Florian Bacher * transformation functions for sequence execution state Signed-off-by: Florian Bacher * added unit test Signed-off-by: Florian Bacher * also store task properties as json string Signed-off-by: Florian Bacher * conversion between sequence execution schemas Signed-off-by: Florian Bacher * added methods to decode back from internal representation of sequence executions Signed-off-by: Florian Bacher * added unit tests Signed-off-by: Florian Bacher * added unit tests Signed-off-by: Florian Bacher * unit tests Signed-off-by: Florian Bacher * added model transformer option for sequence execution repo Signed-off-by: Florian Bacher * added tests for sequence execution migrator Signed-off-by: Florian Bacher * rename properties that are encoded differently Signed-off-by: Florian Bacher * run sequence execution migrator Signed-off-by: Florian Bacher * replace items during migration to prevent old properties from being kept Signed-off-by: Florian Bacher * trigger build Signed-off-by: Florian Bacher * additional unit tests Signed-off-by: Florian Bacher * fixed unit tests Signed-off-by: Florian Bacher --- .../db/migration/project_mv_migration_test.go | 4 +- .../migration/sequence_execution_migration.go | 67 ++++ .../sequence_execution_migration_test.go | 336 ++++++++++++++++++ .../models/sequence_execution/transformer.go | 11 + .../v1/sequence_execution.go | 161 +++++++++ .../v1/sequence_execution_test.go | 172 +++++++++ .../sequence_execution/v1/transformer.go | 154 ++++++++ .../sequence_execution/v1/transformer_test.go | 267 ++++++++++++++ .../db/mongodb_sequence_execution_repo.go | 73 +++- .../mongodb_sequence_execution_repo_test.go | 28 +- shipyard-controller/main.go | 8 + .../models/sequence_execution.go | 3 + 12 files changed, 1266 insertions(+), 18 deletions(-) create mode 100644 shipyard-controller/db/migration/sequence_execution_migration.go create mode 100644 shipyard-controller/db/migration/sequence_execution_migration_test.go create mode 100644 shipyard-controller/db/models/sequence_execution/transformer.go create mode 100644 shipyard-controller/db/models/sequence_execution/v1/sequence_execution.go create mode 100644 shipyard-controller/db/models/sequence_execution/v1/sequence_execution_test.go create mode 100644 shipyard-controller/db/models/sequence_execution/v1/transformer.go create mode 100644 shipyard-controller/db/models/sequence_execution/v1/transformer_test.go diff --git a/shipyard-controller/db/migration/project_mv_migration_test.go b/shipyard-controller/db/migration/project_mv_migration_test.go index f664612002..72632bfe27 100644 --- a/shipyard-controller/db/migration/project_mv_migration_test.go +++ b/shipyard-controller/db/migration/project_mv_migration_test.go @@ -22,11 +22,13 @@ func setupLocalMongoDB() func() { mongoServer, err := memongo.Start(mongoDbVersion) randomDbName := memongo.RandomDatabase() + mongoURL := fmt.Sprintf("%s/%s", mongoServer.URI(), randomDbName) os.Setenv("MONGODB_DATABASE", randomDbName) - os.Setenv("MONGODB_EXTERNAL_CONNECTION_STRING", fmt.Sprintf("%s/%s", mongoServer.URI(), randomDbName)) + os.Setenv("MONGODB_EXTERNAL_CONNECTION_STRING", mongoURL) var mongoDBClient *mongo.Client mongoDBClient, err = mongo.NewClient(options.Client().ApplyURI(mongoServer.URI())) + logger.Infof("MongoDB Server runnning at: %s", mongoURL) if err != nil { logger.Fatalf("Mongo Client setup failed: %s", err) } diff --git a/shipyard-controller/db/migration/sequence_execution_migration.go b/shipyard-controller/db/migration/sequence_execution_migration.go new file mode 100644 index 0000000000..28d2461d97 --- /dev/null +++ b/shipyard-controller/db/migration/sequence_execution_migration.go @@ -0,0 +1,67 @@ +package migration + +import ( + "fmt" + apimodels "github.com/keptn/go-utils/pkg/api/models" + keptnv2 "github.com/keptn/go-utils/pkg/lib/v0_2_0" + "github.com/keptn/keptn/shipyard-controller/db" + v1 "github.com/keptn/keptn/shipyard-controller/db/models/sequence_execution/v1" + "github.com/keptn/keptn/shipyard-controller/models" + logger "github.com/sirupsen/logrus" +) + +// NewSequenceExecutionMigrator creates a new SequenceExecutionMigrator +// Internally it is using the SequenceExecutionJsonStringRepo decorator +// which stores the arbitrary event payload sent by keptn integrations as Json strings to avoid having property names with dots (.) in them +func NewSequenceExecutionMigrator(dbConnection *db.MongoDBConnection) *SequenceExecutionMigrator { + return &SequenceExecutionMigrator{ + projectRepo: db.NewMongoDBKeyEncodingProjectsRepo(dbConnection), + sequenceExecutionRepo: db.NewMongoDBSequenceExecutionRepo(dbConnection, db.WithSequenceExecutionModelTransformer(&v1.ModelTransformer{})), + } +} + +type SequenceExecutionMigrator struct { + sequenceExecutionRepo db.SequenceExecutionRepo + projectRepo db.ProjectRepo +} + +// Run retrieves all existing sequence executions from the repository +// and performs an update operation on each of them using the SequenceExecutionJsonStringRepo. +// This way, sequence executions containing stored with the previous format are migrated to the new one +func (s *SequenceExecutionMigrator) Run() error { + projects, err := s.projectRepo.GetProjects() + if err != nil { + return fmt.Errorf("could not migrate sequence executions to new format: %w", err) + } + s.updateSequenceExecutionsOfProject(projects) + return nil +} + +func (s *SequenceExecutionMigrator) updateSequenceExecutionsOfProject(projects []*apimodels.ExpandedProject) { + if projects == nil { + return + } + for _, project := range projects { + sequenceExecutions, err := s.sequenceExecutionRepo.Get(models.SequenceExecutionFilter{ + Scope: models.EventScope{ + EventData: keptnv2.EventData{ + Project: project.ProjectName, + }, + }}) + if err != nil { + logger.Errorf("Could not retrieve sequence executions for project %s: %v", project.ProjectName, err) + continue + } + for _, sequenceExecution := range sequenceExecutions { + if sequenceExecution.SchemaVersion == v1.SchemaVersionV1 { + continue + } + if err := s.sequenceExecutionRepo.Upsert(sequenceExecution, &models.SequenceExecutionUpsertOptions{ + Replace: true, + }); err != nil { + logger.Errorf("Could not update sequence execution with ID %s for project %s: %v", sequenceExecution.ID, project.ProjectName, err) + continue + } + } + } +} diff --git a/shipyard-controller/db/migration/sequence_execution_migration_test.go b/shipyard-controller/db/migration/sequence_execution_migration_test.go new file mode 100644 index 0000000000..d592eeb616 --- /dev/null +++ b/shipyard-controller/db/migration/sequence_execution_migration_test.go @@ -0,0 +1,336 @@ +package migration + +import ( + "errors" + apimodels "github.com/keptn/go-utils/pkg/api/models" + keptnv2 "github.com/keptn/go-utils/pkg/lib/v0_2_0" + "github.com/keptn/keptn/shipyard-controller/db" + db_mock "github.com/keptn/keptn/shipyard-controller/db/mock" + v1 "github.com/keptn/keptn/shipyard-controller/db/models/sequence_execution/v1" + "github.com/keptn/keptn/shipyard-controller/models" + "github.com/stretchr/testify/require" + "testing" +) + +var testSequenceExecution = models.SequenceExecution{ + ID: "id", + Sequence: keptnv2.Sequence{ + Name: "delivery", + Tasks: []keptnv2.Task{ + { + Name: "deployment", + Properties: map[string]interface{}{ + "deployment.strategy": "direct", + }, + }, + { + Name: "evaluation", + }, + { + Name: "release", + }, + }, + }, + Status: models.SequenceExecutionStatus{ + State: "started", + StateBeforePause: "", + PreviousTasks: []models.TaskExecutionResult{ + { + Name: "deployment", + TriggeredID: "tr1", + Result: "pass", + Status: "succeeded", + Properties: map[string]interface{}{ + "foo.bar": "xyz", + }, + }, + { + Name: "evaluation", + TriggeredID: "tr2", + Result: "pass", + Status: "succeeded", + Properties: map[string]interface{}{ + "foo": map[string]interface{}{ + "bar": "xyz", + }, + }, + }, + }, + CurrentTask: models.TaskExecutionState{ + Name: "release", + TriggeredID: "tr3", + Events: []models.TaskEvent{ + { + EventType: keptnv2.GetStartedEventType("release"), + Source: "helm", + }, + { + EventType: keptnv2.GetFinishedEventType("release"), + Source: "helm", + Properties: map[string]interface{}{ + "release.xyz": "foo", + }, + }, + }, + }, + }, + Scope: models.EventScope{ + EventData: keptnv2.EventData{ + Project: "my-project", + Stage: "my-stage", + Service: "my-service", + }, + KeptnContext: "ctx1", + }, + InputProperties: map[string]interface{}{ + "foo.bar": "xyz", + }, +} + +func TestSequenceExecutionMigrator_MigrateSequenceExecutions(t *testing.T) { + defer setupLocalMongoDB()() + + dbConnection := db.GetMongoDBConnectionInstance() + projectRepo := db.NewMongoDBKeyEncodingProjectsRepo(dbConnection) + + // first, create two projects to let the sequence migrator know which projects we want to migrate + err := projectRepo.CreateProject(&apimodels.ExpandedProject{ProjectName: "my-project"}) + require.Nil(t, err) + + err = projectRepo.CreateProject(&apimodels.ExpandedProject{ProjectName: "my-second-project"}) + require.Nil(t, err) + + // create a sequence execution repo without the transformer to store sequence executions in old format + oldSequenceExecutionRepo := db.NewMongoDBSequenceExecutionRepo(dbConnection, db.WithSequenceExecutionModelTransformer(nil)) + + // insert sequence executions for different projects + sequenceExecution1 := testSequenceExecution + sequenceExecution1.Scope.Project = "my-project" + err = oldSequenceExecutionRepo.Upsert(sequenceExecution1, nil) + require.Nil(t, err) + + sequenceExecution2 := testSequenceExecution + sequenceExecution2.Scope.Project = "my-second-project" + err = oldSequenceExecutionRepo.Upsert(sequenceExecution2, nil) + require.Nil(t, err) + + sm := NewSequenceExecutionMigrator(dbConnection) + + err = sm.Run() + require.Nil(t, err) + + // now, create a sequence execution repo (which has the new model transformer by default), and try to retrieve the migrated sequence executions + newSequenceExecutionRepo := db.NewMongoDBSequenceExecutionRepo(dbConnection) + + migratedSE1, err := newSequenceExecutionRepo.Get(models.SequenceExecutionFilter{Scope: models.EventScope{EventData: keptnv2.EventData{Project: "my-project"}}}) + require.Nil(t, err) + + require.Len(t, migratedSE1, 1) + // set the SchemaVersion property of the original sequence execution here, so we can use require.Equal next + sequenceExecution1.SchemaVersion = v1.SchemaVersionV1 + require.Equal(t, sequenceExecution1, migratedSE1[0]) + + migratedSE2, err := newSequenceExecutionRepo.Get(models.SequenceExecutionFilter{Scope: models.EventScope{EventData: keptnv2.EventData{Project: "my-second-project"}}}) + require.Nil(t, err) + + require.Len(t, migratedSE2, 1) + // set the SchemaVersion property of the original sequence execution here, so we can use require.Equal next + sequenceExecution2.SchemaVersion = v1.SchemaVersionV1 + require.Equal(t, sequenceExecution2, migratedSE2[0]) +} + +func TestSequenceExecutionMigrator_MigrateSequenceExecutions_MixedOldAndNew(t *testing.T) { + defer setupLocalMongoDB()() + + dbConnection := db.GetMongoDBConnectionInstance() + projectRepo := db.NewMongoDBKeyEncodingProjectsRepo(dbConnection) + + // first, create two projects to let the sequence migrator know which projects we want to migrate + err := projectRepo.CreateProject(&apimodels.ExpandedProject{ProjectName: "my-project"}) + require.Nil(t, err) + + err = projectRepo.CreateProject(&apimodels.ExpandedProject{ProjectName: "my-second-project"}) + require.Nil(t, err) + + // create a sequence execution repo without the transformer to store sequence executions in old format + oldSequenceExecutionRepo := db.NewMongoDBSequenceExecutionRepo(dbConnection, db.WithSequenceExecutionModelTransformer(nil)) + + // and another one with the transformer + newSequenceExecutionRepo := db.NewMongoDBSequenceExecutionRepo(dbConnection) + + // insert sequence executions for different projects + sequenceExecution1 := testSequenceExecution + sequenceExecution1.Scope.Project = "my-project" + err = oldSequenceExecutionRepo.Upsert(sequenceExecution1, nil) + require.Nil(t, err) + + sequenceExecution2 := testSequenceExecution + sequenceExecution2.Scope.Project = "my-second-project" + err = newSequenceExecutionRepo.Upsert(sequenceExecution2, nil) + require.Nil(t, err) + + sm := NewSequenceExecutionMigrator(dbConnection) + + err = sm.Run() + require.Nil(t, err) + + migratedSE1, err := newSequenceExecutionRepo.Get(models.SequenceExecutionFilter{Scope: models.EventScope{EventData: keptnv2.EventData{Project: "my-project"}}}) + require.Nil(t, err) + + require.Len(t, migratedSE1, 1) + // set the SchemaVersion property of the original sequence execution here, so we can use require.Equal next + sequenceExecution1.SchemaVersion = v1.SchemaVersionV1 + require.Equal(t, sequenceExecution1, migratedSE1[0]) + + migratedSE2, err := newSequenceExecutionRepo.Get(models.SequenceExecutionFilter{Scope: models.EventScope{EventData: keptnv2.EventData{Project: "my-second-project"}}}) + require.Nil(t, err) + + require.Len(t, migratedSE2, 1) + // set the SchemaVersion property of the original sequence execution here, so we can use require.Equal next + sequenceExecution2.SchemaVersion = v1.SchemaVersionV1 + require.Equal(t, sequenceExecution2, migratedSE2[0]) +} + +func TestSequenceExecutionMigrator_MigrateMultipleTimes(t *testing.T) { + defer setupLocalMongoDB()() + + dbConnection := db.GetMongoDBConnectionInstance() + projectRepo := db.NewMongoDBKeyEncodingProjectsRepo(dbConnection) + + // first, create two projects to let the sequence migrator know which projects we want to migrate + err := projectRepo.CreateProject(&apimodels.ExpandedProject{ProjectName: "my-project"}) + require.Nil(t, err) + + err = projectRepo.CreateProject(&apimodels.ExpandedProject{ProjectName: "my-second-project"}) + require.Nil(t, err) + + // create a sequence execution repo without the transformer to store sequence executions in old format + oldSequenceExecutionRepo := db.NewMongoDBSequenceExecutionRepo(dbConnection, db.WithSequenceExecutionModelTransformer(nil)) + + // insert sequence executions for different projects + sequenceExecution1 := testSequenceExecution + sequenceExecution1.Scope.Project = "my-project" + err = oldSequenceExecutionRepo.Upsert(sequenceExecution1, nil) + require.Nil(t, err) + + sm := NewSequenceExecutionMigrator(dbConnection) + + // run the migrator multiple times + + for i := 0; i < 10; i++ { + err = sm.Run() + require.Nil(t, err) + } + // now, create a sequence execution repo (which has the new model transformer by default), and try to retrieve the migrated sequence executions + newSequenceExecutionRepo := db.NewMongoDBSequenceExecutionRepo(dbConnection) + + migratedSE1, err := newSequenceExecutionRepo.Get(models.SequenceExecutionFilter{Scope: models.EventScope{EventData: keptnv2.EventData{Project: "my-project"}}}) + require.Nil(t, err) + + // verify that one sequence is returned, i.e. the migrator should not duplicate anything + require.Len(t, migratedSE1, 1) + // set the SchemaVersion property of the original sequence execution here, so we can use require.Equal next + sequenceExecution1.SchemaVersion = v1.SchemaVersionV1 + require.Equal(t, sequenceExecution1, migratedSE1[0]) +} + +func TestSequenceExecutionMigrator_RetrievingProjectsFails(t *testing.T) { + + sm := NewSequenceExecutionMigrator(db.GetMongoDBConnectionInstance()) + + sm.projectRepo = &db_mock.ProjectRepoMock{ + GetProjectsFunc: func() ([]*apimodels.ExpandedProject, error) { + return nil, errors.New("oops") + }, + } + + err := sm.Run() + + require.NotNil(t, err) +} + +func TestSequenceExecutionMigrator_RetrievingSequencesFails(t *testing.T) { + + sm := NewSequenceExecutionMigrator(nil) + + sm.projectRepo = &db_mock.ProjectRepoMock{ + GetProjectsFunc: func() ([]*apimodels.ExpandedProject, error) { + return []*apimodels.ExpandedProject{ + { + ProjectName: "my-project", + }, + { + ProjectName: "my-other-project", + }, + }, nil + }, + } + + mockSERepo := &db_mock.SequenceExecutionRepoMock{ + GetFunc: func(filter models.SequenceExecutionFilter) ([]models.SequenceExecution, error) { + // return error for "my-project + if filter.Scope.Project == "my-project" { + return nil, errors.New("oops") + } + // for everything else, return a slice containing a sequence execution + return []models.SequenceExecution{ + testSequenceExecution, + }, nil + }, + UpsertFunc: func(item models.SequenceExecution, options *models.SequenceExecutionUpsertOptions) error { + return nil + }, + } + sm.sequenceExecutionRepo = mockSERepo + + err := sm.Run() + + // after the sequences for the first project could not be retrieved, the migrator should have continued with the other project's sequences + require.Len(t, mockSERepo.GetCalls(), 2) + require.Len(t, mockSERepo.UpsertCalls(), 1) + require.Equal(t, testSequenceExecution, mockSERepo.UpsertCalls()[0].Item) + + require.Nil(t, err) +} + +func TestSequenceExecutionMigrator_UpdatingOneSequenceFails(t *testing.T) { + + sm := NewSequenceExecutionMigrator(nil) + + sm.projectRepo = &db_mock.ProjectRepoMock{ + GetProjectsFunc: func() ([]*apimodels.ExpandedProject, error) { + return []*apimodels.ExpandedProject{ + { + ProjectName: "my-project", + }, + { + ProjectName: "my-other-project", + }, + }, nil + }, + } + + mockSERepo := &db_mock.SequenceExecutionRepoMock{ + GetFunc: func(filter models.SequenceExecutionFilter) ([]models.SequenceExecution, error) { + return []models.SequenceExecution{ + testSequenceExecution, + }, nil + }, + } + mockSERepo.UpsertFunc = func(item models.SequenceExecution, options *models.SequenceExecutionUpsertOptions) error { + // return an error for the first call + if len(mockSERepo.UpsertCalls()) == 0 { + return errors.New("oops") + } + return nil + } + sm.sequenceExecutionRepo = mockSERepo + + err := sm.Run() + + // after the sequences for the first project could not be migrated, the migrator should have continued with the other project's sequences + require.Len(t, mockSERepo.GetCalls(), 2) + require.Len(t, mockSERepo.UpsertCalls(), 2) + + require.Nil(t, err) +} diff --git a/shipyard-controller/db/models/sequence_execution/transformer.go b/shipyard-controller/db/models/sequence_execution/transformer.go new file mode 100644 index 0000000000..f37fd0d7a6 --- /dev/null +++ b/shipyard-controller/db/models/sequence_execution/transformer.go @@ -0,0 +1,11 @@ +package sequence_execution + +import "github.com/keptn/keptn/shipyard-controller/models" + +// ModelTransformer is an interface that defines functions for transforming between the internal representation +// of a sequence execution and the model structure outside the db package +type ModelTransformer interface { + TransformToDBModel(execution models.SequenceExecution) interface{} + TransformEventToDBModel(event models.TaskEvent) interface{} + TransformToSequenceExecution(dbItem interface{}) (*models.SequenceExecution, error) +} diff --git a/shipyard-controller/db/models/sequence_execution/v1/sequence_execution.go b/shipyard-controller/db/models/sequence_execution/v1/sequence_execution.go new file mode 100644 index 0000000000..a6e4f883f7 --- /dev/null +++ b/shipyard-controller/db/models/sequence_execution/v1/sequence_execution.go @@ -0,0 +1,161 @@ +package v1 + +import ( + "encoding/json" + keptnv2 "github.com/keptn/go-utils/pkg/lib/v0_2_0" + "github.com/keptn/keptn/shipyard-controller/models" +) + +const SchemaVersionV1 = "1" + +type SchemaVersion struct { + SchemaVersion string `json:"schemaVersion" bson:"schemaVersion"` +} +type JsonStringEncodedSequenceExecution struct { + ID string `json:"_id" bson:"_id"` + // SchemaVersion indicates the version of the schema - needed to decide if items in collection need to be migrated + SchemaVersion `bson:",inline"` + // Sequence contains the complete sequence definition + Sequence Sequence `json:"sequence" bson:"sequence"` + Status SequenceExecutionStatus `json:"status" bson:"status"` + Scope models.EventScope `json:"scope" bson:"scope"` + // EncodedInputProperties contains properties of the event which triggered the task sequence + EncodedInputProperties string `json:"encodedInputProperties" bson:"encodedInputProperties"` +} + +type Sequence struct { + Name string `json:"name" bson:"name"` + Tasks []Task `json:"tasks" bson:"tasks"` +} + +func (s Sequence) DecodeTasks() []keptnv2.Task { + tasks := []keptnv2.Task{} + + for _, task := range s.Tasks { + newTask := keptnv2.Task{ + Name: task.Name, + TriggeredAfter: task.TriggeredAfter, + } + if task.EncodedProperties != "" { + properties := map[string]interface{}{} + if err := json.Unmarshal([]byte(task.EncodedProperties), &properties); err == nil { + newTask.Properties = properties + } + } + tasks = append(tasks, newTask) + } + return tasks +} + +type Task struct { + Name string `json:"name" bson:"name"` + TriggeredAfter string `json:"triggeredAfter,omitempty" bson:"triggeredAfter,omitempty"` + EncodedProperties string `json:"encodedProperties" bson:"encodedProperties"` +} + +type SequenceExecutionStatus struct { + State string `json:"state" bson:"state"` // triggered, waiting, suspended (approval in progress), paused, finished, cancelled, timedOut + // StateBeforePause is needed to keep track of the state before a sequence has been paused. Example: when a sequence has been paused while being queued, and then resumed, it should not be set to started immediately, but to the state it had before + StateBeforePause string `json:"stateBeforePause" bson:"stateBeforePause"` + // PreviousTasks contains the results of all completed tasks of the sequence + PreviousTasks []TaskExecutionResult `json:"previousTasks" bson:"previousTasks"` + // CurrentTask represents the state of the currently active task + CurrentTask TaskExecutionState `json:"currentTask" bson:"currentTask"` +} + +func (s SequenceExecutionStatus) DecodePreviousTasks() []models.TaskExecutionResult { + result := []models.TaskExecutionResult{} + + for _, previousTask := range s.PreviousTasks { + newPreviousTask := models.TaskExecutionResult{ + Name: previousTask.Name, + TriggeredID: previousTask.TriggeredID, + Result: previousTask.Result, + Status: previousTask.Status, + } + + if previousTask.EncodedProperties != "" { + properties := map[string]interface{}{} + if err := json.Unmarshal([]byte(previousTask.EncodedProperties), &properties); err == nil { + newPreviousTask.Properties = properties + } + } + + result = append(result, newPreviousTask) + } + return result +} + +type TaskExecutionResult struct { + Name string `json:"name" bson:"name"` + TriggeredID string `json:"triggeredID" bson:"triggeredID"` + Result keptnv2.ResultType `json:"result" bson:"result"` + Status keptnv2.StatusType `json:"status" bson:"status"` + // EncodedProperties contains the aggregated results of the task's executors + EncodedProperties string `json:"encodedProperties" bson:"encodedProperties"` +} + +type TaskExecutionState struct { + Name string `json:"name" bson:"name"` + TriggeredID string `json:"triggeredID" bson:"triggeredID"` + Events []TaskEvent `json:"events" bson:"events"` +} + +func (s TaskExecutionState) DecodeEvents() []models.TaskEvent { + result := []models.TaskEvent{} + + for _, event := range s.Events { + newEvent := models.TaskEvent{ + EventType: event.EventType, + Source: event.Source, + Result: event.Result, + Status: event.Status, + Time: event.Time, + } + if event.EncodedProperties != "" { + properties := map[string]interface{}{} + if err := json.Unmarshal([]byte(event.EncodedProperties), &properties); err == nil { + newEvent.Properties = properties + } + } + result = append(result, newEvent) + } + return result +} + +type TaskEvent struct { + EventType string `json:"eventType" bson:"eventType"` + Source string `json:"source" bson:"source"` + Result keptnv2.ResultType `json:"result" bson:"result"` + Status keptnv2.StatusType `json:"status" bson:"status"` + Time string `json:"time" bson:"time"` + EncodedProperties string `json:"encodedProperties" bson:"encodedProperties"` +} + +func (e JsonStringEncodedSequenceExecution) ToSequenceExecution() models.SequenceExecution { + result := models.SequenceExecution{ + ID: e.ID, + SchemaVersion: SchemaVersionV1, + Sequence: keptnv2.Sequence{ + Name: e.Sequence.Name, + Tasks: e.Sequence.DecodeTasks(), + }, + Status: models.SequenceExecutionStatus{ + State: e.Status.State, + StateBeforePause: e.Status.StateBeforePause, + PreviousTasks: e.Status.DecodePreviousTasks(), + CurrentTask: models.TaskExecutionState{ + Name: e.Status.CurrentTask.Name, + TriggeredID: e.Status.CurrentTask.TriggeredID, + Events: e.Status.CurrentTask.DecodeEvents(), + }, + }, + Scope: e.Scope, + } + inputProperties := map[string]interface{}{} + err := json.Unmarshal([]byte(e.EncodedInputProperties), &inputProperties) + if err == nil { + result.InputProperties = inputProperties + } + return result +} diff --git a/shipyard-controller/db/models/sequence_execution/v1/sequence_execution_test.go b/shipyard-controller/db/models/sequence_execution/v1/sequence_execution_test.go new file mode 100644 index 0000000000..43d9330d36 --- /dev/null +++ b/shipyard-controller/db/models/sequence_execution/v1/sequence_execution_test.go @@ -0,0 +1,172 @@ +package v1 + +import ( + keptnv2 "github.com/keptn/go-utils/pkg/lib/v0_2_0" + "github.com/keptn/keptn/shipyard-controller/models" + "github.com/stretchr/testify/require" + "testing" +) + +var testSequenceExecution = models.SequenceExecution{ + ID: "id", + SchemaVersion: SchemaVersionV1, + Sequence: keptnv2.Sequence{ + Name: "delivery", + Tasks: []keptnv2.Task{ + { + Name: "deployment", + Properties: map[string]interface{}{ + "deployment.strategy": "direct", + }, + }, + { + Name: "evaluation", + }, + { + Name: "release", + }, + }, + }, + Status: models.SequenceExecutionStatus{ + State: "started", + StateBeforePause: "", + PreviousTasks: []models.TaskExecutionResult{ + { + Name: "deployment", + TriggeredID: "tr1", + Result: "pass", + Status: "succeeded", + Properties: map[string]interface{}{ + "foo.bar": "xyz", + }, + }, + { + Name: "evaluation", + TriggeredID: "tr2", + Result: "pass", + Status: "succeeded", + Properties: map[string]interface{}{ + "foo": map[string]interface{}{ + "bar": "xyz", + }, + }, + }, + }, + CurrentTask: models.TaskExecutionState{ + Name: "release", + TriggeredID: "tr3", + Events: []models.TaskEvent{ + { + EventType: keptnv2.GetStartedEventType("release"), + Source: "helm", + }, + { + EventType: keptnv2.GetFinishedEventType("release"), + Source: "helm", + Properties: map[string]interface{}{ + "release.xyz": "foo", + }, + }, + }, + }, + }, + Scope: models.EventScope{ + EventData: keptnv2.EventData{ + Project: "my-project", + Stage: "my-stage", + Service: "my-service", + }, + KeptnContext: "ctx1", + }, + InputProperties: map[string]interface{}{ + "foo.bar": "xyz", + }, +} + +var testJsonStringEncodedSequenceExecution = JsonStringEncodedSequenceExecution{ + ID: "id", + SchemaVersion: SchemaVersion{ + SchemaVersion: SchemaVersionV1, + }, + Sequence: Sequence{ + Name: "delivery", + Tasks: []Task{ + { + Name: "deployment", + EncodedProperties: `{"deployment.strategy":"direct"}`, + }, + { + Name: "evaluation", + }, + { + Name: "release", + }, + }, + }, + Status: SequenceExecutionStatus{ + State: "started", + StateBeforePause: "", + PreviousTasks: []TaskExecutionResult{ + { + Name: "deployment", + TriggeredID: "tr1", + Result: "pass", + Status: "succeeded", + EncodedProperties: `{"foo.bar":"xyz"}`, + }, + { + Name: "evaluation", + TriggeredID: "tr2", + Result: "pass", + Status: "succeeded", + EncodedProperties: `{"foo":{"bar":"xyz"}}`, + }, + }, + CurrentTask: TaskExecutionState{ + Name: "release", + TriggeredID: "tr3", + Events: []TaskEvent{ + { + EventType: keptnv2.GetStartedEventType("release"), + Source: "helm", + }, + { + EventType: keptnv2.GetFinishedEventType("release"), + Source: "helm", + EncodedProperties: `{"release.xyz":"foo"}`, + }, + }, + }, + }, + Scope: models.EventScope{ + EventData: keptnv2.EventData{ + Project: "my-project", + Stage: "my-stage", + Service: "my-service", + }, + KeptnContext: "ctx1", + }, + EncodedInputProperties: `{"foo.bar":"xyz"}`, +} + +func TestJsonStringEncodedSequenceExecution_ToSequenceExecution(t *testing.T) { + tests := []struct { + name string + obj JsonStringEncodedSequenceExecution + want models.SequenceExecution + wantErr bool + }{ + { + name: "transform back to sequence execution", + obj: testJsonStringEncodedSequenceExecution, + want: testSequenceExecution, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.obj.ToSequenceExecution() + require.Equal(t, tt.want, got) + }) + } +} diff --git a/shipyard-controller/db/models/sequence_execution/v1/transformer.go b/shipyard-controller/db/models/sequence_execution/v1/transformer.go new file mode 100644 index 0000000000..5c5bbd403b --- /dev/null +++ b/shipyard-controller/db/models/sequence_execution/v1/transformer.go @@ -0,0 +1,154 @@ +package v1 + +import ( + "encoding/json" + keptnv2 "github.com/keptn/go-utils/pkg/lib/v0_2_0" + "github.com/keptn/keptn/shipyard-controller/models" +) + +type ModelTransformer struct{} + +// TransformToDBModel transforms an instance of models.SequenceExecution to the db specific schema defined in this package +func (ModelTransformer) TransformToDBModel(execution models.SequenceExecution) interface{} { + return fromSequenceExecution(execution) +} + +func (ModelTransformer) TransformEventToDBModel(event models.TaskEvent) interface{} { + return transformTaskEvent(event) +} + +func (ModelTransformer) TransformToSequenceExecution(dbItem interface{}) (*models.SequenceExecution, error) { + data, _ := json.Marshal(dbItem) + + schemaInfo := &SchemaVersion{} + if err := json.Unmarshal(data, schemaInfo); err != nil { + return nil, err + } + + // if the current schema version is being used, we need to transform it to model.JsonStringEncodedSequenceExecution + if schemaInfo.SchemaVersion == SchemaVersionV1 { + internalSequenceExecution := &JsonStringEncodedSequenceExecution{} + if err := json.Unmarshal(data, internalSequenceExecution); err != nil { + return nil, err + } + transformedSequenceExecution := internalSequenceExecution.ToSequenceExecution() + return &transformedSequenceExecution, nil + } + + // if the old schema is still being used by that item, we can directly unmarshal it to a model.JsonStringEncodedSequenceExecution + sequenceExecution := &models.SequenceExecution{} + if err := json.Unmarshal(data, sequenceExecution); err != nil { + return nil, err + } + + return sequenceExecution, nil +} + +func fromSequenceExecution(se models.SequenceExecution) JsonStringEncodedSequenceExecution { + newSE := JsonStringEncodedSequenceExecution{ + ID: se.ID, + Sequence: Sequence{ + Name: se.Sequence.Name, + Tasks: transformTasks(se.Sequence.Tasks), + }, + Status: transformStatus(se.Status), + Scope: se.Scope, + SchemaVersion: SchemaVersion{SchemaVersion: SchemaVersionV1}, + } + if se.InputProperties != nil { + inputPropertiesJsonString, err := json.Marshal(se.InputProperties) + if err == nil { + newSE.EncodedInputProperties = string(inputPropertiesJsonString) + } + } + return newSE +} + +func transformTasks(tasks []keptnv2.Task) []Task { + result := []Task{} + + for _, task := range tasks { + newTask := Task{ + Name: task.Name, + TriggeredAfter: task.TriggeredAfter, + } + if task.Properties != nil { + taskPropertiesString, err := json.Marshal(task.Properties) + if err == nil { + newTask.EncodedProperties = string(taskPropertiesString) + } + } + result = append(result, newTask) + } + return result +} + +func transformStatus(status models.SequenceExecutionStatus) SequenceExecutionStatus { + newStatus := SequenceExecutionStatus{ + State: status.State, + StateBeforePause: status.StateBeforePause, + PreviousTasks: transformPreviousTasks(status.PreviousTasks), + CurrentTask: transformCurrentTask(status.CurrentTask), + } + + return newStatus +} + +func transformCurrentTask(task models.TaskExecutionState) TaskExecutionState { + newTaskExecutionState := TaskExecutionState{ + Name: task.Name, + TriggeredID: task.TriggeredID, + Events: transformTaskEvents(task.Events), + } + return newTaskExecutionState +} + +func transformTaskEvents(events []models.TaskEvent) []TaskEvent { + newTaskEvents := []TaskEvent{} + + for _, e := range events { + newTaskEvent := transformTaskEvent(e) + newTaskEvents = append(newTaskEvents, newTaskEvent) + } + return newTaskEvents +} + +func transformTaskEvent(e models.TaskEvent) TaskEvent { + newTaskEvent := TaskEvent{ + EventType: e.EventType, + Source: e.Source, + Result: e.Result, + Status: e.Status, + Time: e.Time, + } + + if e.Properties != nil { + properties, err := json.Marshal(e.Properties) + if err == nil { + newTaskEvent.EncodedProperties = string(properties) + } + } + return newTaskEvent +} + +func transformPreviousTasks(tasks []models.TaskExecutionResult) []TaskExecutionResult { + newPreviousTasks := []TaskExecutionResult{} + + for _, t := range tasks { + newPreviousTask := TaskExecutionResult{ + Name: t.Name, + TriggeredID: t.TriggeredID, + Result: t.Result, + Status: t.Status, + } + + if t.Properties != nil { + properties, err := json.Marshal(t.Properties) + if err == nil { + newPreviousTask.EncodedProperties = string(properties) + } + } + newPreviousTasks = append(newPreviousTasks, newPreviousTask) + } + return newPreviousTasks +} diff --git a/shipyard-controller/db/models/sequence_execution/v1/transformer_test.go b/shipyard-controller/db/models/sequence_execution/v1/transformer_test.go new file mode 100644 index 0000000000..7adeddb1da --- /dev/null +++ b/shipyard-controller/db/models/sequence_execution/v1/transformer_test.go @@ -0,0 +1,267 @@ +package v1 + +import ( + keptnv2 "github.com/keptn/go-utils/pkg/lib/v0_2_0" + "github.com/keptn/keptn/shipyard-controller/models" + "github.com/stretchr/testify/require" + "testing" +) + +func TestFromSequenceExecution(t *testing.T) { + type args struct { + se models.SequenceExecution + } + tests := []struct { + name string + args args + want JsonStringEncodedSequenceExecution + }{ + { + name: "transform sequence execution", + args: args{ + se: testSequenceExecution, + }, + want: testJsonStringEncodedSequenceExecution, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := ModelTransformer{} + got := mt.TransformToDBModel(tt.args.se) + require.Equal(t, tt.want, got) + }) + } +} + +func TestModelTransformer_TransformEventToDBModel(t *testing.T) { + type args struct { + event models.TaskEvent + } + tests := []struct { + name string + args args + want interface{} + }{ + { + name: "transform event", + args: args{ + event: models.TaskEvent{ + EventType: "mytask.triggered", + Source: "my-service", + Result: "pass", + Status: "succeeded", + Properties: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + want: TaskEvent{ + EventType: "mytask.triggered", + Source: "my-service", + Result: "pass", + Status: "succeeded", + EncodedProperties: `{"foo":"bar"}`, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mo := ModelTransformer{} + got := mo.TransformEventToDBModel(tt.args.event) + + require.Equal(t, tt.want, got) + }) + } +} + +func TestModelTransformer_TransformToSequenceExecution(t *testing.T) { + type args struct { + dbItem interface{} + } + tests := []struct { + name string + args args + want *models.SequenceExecution + wantErr bool + }{ + { + name: "transform v1 schema", + args: args{ + dbItem: JsonStringEncodedSequenceExecution{ + ID: "1", + SchemaVersion: SchemaVersion{ + SchemaVersion: SchemaVersionV1, + }, + Sequence: Sequence{ + Name: "my-sequence", + Tasks: []Task{ + { + Name: "delivery", + TriggeredAfter: "1m", + EncodedProperties: `{"foo":"bar"}`, + }, + }, + }, + Status: SequenceExecutionStatus{ + State: "started", + }, + Scope: models.EventScope{ + EventData: keptnv2.EventData{ + Project: "my-project", + Stage: "my-stage", + Service: "my-service", + }, + }, + EncodedInputProperties: `{"foo":"bar"}`, + }, + }, + want: &models.SequenceExecution{ + ID: "1", + SchemaVersion: SchemaVersionV1, + Sequence: keptnv2.Sequence{ + Name: "my-sequence", + Tasks: []keptnv2.Task{ + { + Name: "delivery", + TriggeredAfter: "1m", + Properties: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + }, + Status: models.SequenceExecutionStatus{ + State: "started", + PreviousTasks: []models.TaskExecutionResult{}, + CurrentTask: models.TaskExecutionState{ + Events: []models.TaskEvent{}, + }, + }, + Scope: models.EventScope{ + EventData: keptnv2.EventData{ + Project: "my-project", + Stage: "my-stage", + Service: "my-service", + }, + }, + InputProperties: map[string]interface{}{ + "foo": "bar", + }, + }, + wantErr: false, + }, + { + name: "transform previous schema", + args: args{ + dbItem: &models.SequenceExecution{ + ID: "1", + Sequence: keptnv2.Sequence{ + Name: "my-sequence", + Tasks: []keptnv2.Task{ + { + Name: "delivery", + TriggeredAfter: "1m", + Properties: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + }, + Status: models.SequenceExecutionStatus{ + State: "started", + PreviousTasks: []models.TaskExecutionResult{}, + CurrentTask: models.TaskExecutionState{ + Events: []models.TaskEvent{}, + }, + }, + Scope: models.EventScope{ + EventData: keptnv2.EventData{ + Project: "my-project", + Stage: "my-stage", + Service: "my-service", + }, + }, + InputProperties: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + want: &models.SequenceExecution{ + ID: "1", + Sequence: keptnv2.Sequence{ + Name: "my-sequence", + Tasks: []keptnv2.Task{ + { + Name: "delivery", + TriggeredAfter: "1m", + Properties: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + }, + Status: models.SequenceExecutionStatus{ + State: "started", + PreviousTasks: []models.TaskExecutionResult{}, + CurrentTask: models.TaskExecutionState{ + Events: []models.TaskEvent{}, + }, + }, + Scope: models.EventScope{ + EventData: keptnv2.EventData{ + Project: "my-project", + Stage: "my-stage", + Service: "my-service", + }, + }, + InputProperties: map[string]interface{}{ + "foo": "bar", + }, + }, + wantErr: false, + }, + { + name: "invalid object", + args: args{ + dbItem: "invalid", + }, + want: nil, + wantErr: true, + }, + { + name: "invalid object with schema version 1", + args: args{ + dbItem: map[string]interface{}{ + "schemaVersion": "1", + "sequence": "invalid", + }, + }, + want: nil, + wantErr: true, + }, + { + name: "invalid object with unspecified schema version", + args: args{ + dbItem: map[string]interface{}{ + "schemaVersion": "0", + "sequence": "invalid", + }, + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mo := ModelTransformer{} + got, err := mo.TransformToSequenceExecution(tt.args.dbItem) + if tt.wantErr { + require.NotNil(t, err) + } else { + require.Nil(t, err) + } + + require.Equal(t, tt.want, got) + }) + } +} diff --git a/shipyard-controller/db/mongodb_sequence_execution_repo.go b/shipyard-controller/db/mongodb_sequence_execution_repo.go index bef2a71feb..ab8e0ff0ba 100644 --- a/shipyard-controller/db/mongodb_sequence_execution_repo.go +++ b/shipyard-controller/db/mongodb_sequence_execution_repo.go @@ -6,6 +6,8 @@ import ( "errors" "fmt" apimodels "github.com/keptn/go-utils/pkg/api/models" + "github.com/keptn/keptn/shipyard-controller/db/models/sequence_execution" + v02 "github.com/keptn/keptn/shipyard-controller/db/models/sequence_execution/v1" "github.com/keptn/keptn/shipyard-controller/models" log "github.com/sirupsen/logrus" "go.mongodb.org/mongo-driver/bson" @@ -22,12 +24,32 @@ const eventQueueSequenceStateCollectionName = "shipyard-controller-event-queue-s var ErrProjectNameMustNotBeEmpty = errors.New("project name must not be empty") var ErrSequenceIDMustNotBeEmpty = errors.New("sequence ID must not be empty") +type SequenceExecutionRepoOpt func(repo *MongoDBSequenceExecutionRepo) + +func WithSequenceExecutionModelTransformer(transformer sequence_execution.ModelTransformer) SequenceExecutionRepoOpt { + return func(repo *MongoDBSequenceExecutionRepo) { + repo.ModelTransformer = transformer + } +} + type MongoDBSequenceExecutionRepo struct { DbConnection *MongoDBConnection + // ModelTransformer allows transforming sequence execution objects before storing and after retrieving to and from the database. + // This is needed if the items for that collection should be stored in a different format while retaining its structure used outside the package + ModelTransformer sequence_execution.ModelTransformer } -func NewMongoDBSequenceExecutionRepo(dbConnection *MongoDBConnection) *MongoDBSequenceExecutionRepo { - return &MongoDBSequenceExecutionRepo{DbConnection: dbConnection} +func NewMongoDBSequenceExecutionRepo(dbConnection *MongoDBConnection, opts ...SequenceExecutionRepoOpt) *MongoDBSequenceExecutionRepo { + repo := &MongoDBSequenceExecutionRepo{ + DbConnection: dbConnection, + // use the v1 ModelTransformer by default + ModelTransformer: &v02.ModelTransformer{}, + } + + for _, opt := range opts { + opt(repo) + } + return repo } // Get returns all matching sequence executions, based on the given filter @@ -50,7 +72,7 @@ func (mdbrepo *MongoDBSequenceExecutionRepo) Get(filter models.SequenceExecution for cur.Next(ctx) { var outInterface interface{} err := cur.Decode(&outInterface) - sequenceExecution, err := transformBSONToSequenceExecution(outInterface) + sequenceExecution, err := mdbrepo.transformBSONToSequenceExecution(outInterface) if err != nil { log.Errorf("Could not decode sequenceExecution: %v", err) continue @@ -83,7 +105,7 @@ func (mdbrepo *MongoDBSequenceExecutionRepo) GetByTriggeredID(project, triggered if err := item.Decode(&outInterface); err != nil { return nil, err } - sequenceExecution, err := transformBSONToSequenceExecution(outInterface) + sequenceExecution, err := mdbrepo.transformBSONToSequenceExecution(outInterface) if err != nil { return nil, err } @@ -115,10 +137,21 @@ func (mdbrepo *MongoDBSequenceExecutionRepo) Upsert(item models.SequenceExecutio } opts := options.Update().SetUpsert(true) + var internalDBItem interface{} + if mdbrepo.ModelTransformer != nil { + internalDBItem = mdbrepo.ModelTransformer.TransformToDBModel(item) + } else { + internalDBItem = item + } + filter := bson.D{{"_id", item.ID}} - update := bson.D{{"$set", item}} - _, err = collection.UpdateOne(ctx, filter, update, opts) + if upsertOptions != nil && upsertOptions.Replace { + _, err = collection.ReplaceOne(ctx, filter, internalDBItem) + } else { + update := bson.D{{"$set", internalDBItem}} + _, err = collection.UpdateOne(ctx, filter, update, opts) + } if err != nil { return err } @@ -147,7 +180,14 @@ func (mdbrepo *MongoDBSequenceExecutionRepo) AppendTaskEvent(taskSequence models // by using the $push operator in the FindOneAndUpdate function, we ensure that we follow an append-only approach to this property, // since this is the one property that can potentially be updated by multiple threads handling .finished/.started events for the same task - update := bson.M{"$push": bson.M{"status.currentTask.events": event}} + var eventItem interface{} + if mdbrepo.ModelTransformer != nil { + // if we have a transformer that defines how the event should be stored internally, use it to transform the item + eventItem = mdbrepo.ModelTransformer.TransformEventToDBModel(event) + } else { + eventItem = event + } + update := bson.M{"$push": bson.M{"status.currentTask.events": eventItem}} res := collection.FindOneAndUpdate(ctx, filter, update, opts) if res.Err() != nil { @@ -159,7 +199,7 @@ func (mdbrepo *MongoDBSequenceExecutionRepo) AppendTaskEvent(taskSequence models if err != nil { return nil, err } - sequenceExecution, err := transformBSONToSequenceExecution(outInterface) + sequenceExecution, err := mdbrepo.transformBSONToSequenceExecution(outInterface) if err != nil { return nil, err } @@ -201,7 +241,7 @@ func (mdbrepo *MongoDBSequenceExecutionRepo) UpdateStatus(taskSequence models.Se if err != nil { return nil, err } - sequenceExecution, err := transformBSONToSequenceExecution(outInterface) + sequenceExecution, err := mdbrepo.transformBSONToSequenceExecution(outInterface) if err != nil { return nil, err } @@ -353,19 +393,28 @@ func (mdbrepo *MongoDBSequenceExecutionRepo) updateGlobalSequenceContext(eventSc return err } -func transformBSONToSequenceExecution(outInterface interface{}) (*models.SequenceExecution, error) { +func (mdbrepo *MongoDBSequenceExecutionRepo) transformBSONToSequenceExecution(outInterface interface{}) (*models.SequenceExecution, error) { outInterface, err := flattenRecursively(outInterface) if err != nil { return nil, err } - data, _ := json.Marshal(outInterface) + // if we have a model transformer, use that one to transform the item into a model.SequenceExecution object + if mdbrepo.ModelTransformer != nil { + return mdbrepo.ModelTransformer.TransformToSequenceExecution(outInterface) + } + + // if we have no model transformer, directly decode to models.SequenceExecution + data, err := json.Marshal(outInterface) + if err != nil { + return nil, err + } sequenceExecution := &models.SequenceExecution{} if err := json.Unmarshal(data, sequenceExecution); err != nil { return nil, err } - //sequenceExecution.ID = outInterface.(map[string]interface{})["_id"].(string) + return sequenceExecution, nil } diff --git a/shipyard-controller/db/mongodb_sequence_execution_repo_test.go b/shipyard-controller/db/mongodb_sequence_execution_repo_test.go index 5aa73b67d7..d0f3faf04c 100644 --- a/shipyard-controller/db/mongodb_sequence_execution_repo_test.go +++ b/shipyard-controller/db/mongodb_sequence_execution_repo_test.go @@ -30,6 +30,7 @@ func TestMongoDBTaskSequenceV2Repo_InsertAndRetrieve(t *testing.T) { require.Nil(t, err) require.Len(t, get, 1) + get[0].SchemaVersion = "" require.Equal(t, sequence, get[0]) err = mdbrepo.Clear("my-project") @@ -63,6 +64,7 @@ func TestMongoDBTaskSequenceV2Repo_GetByTriggeredID(t *testing.T) { require.Nil(t, err) require.Len(t, get, 1) + get[0].SchemaVersion = "" require.Equal(t, sequence, get[0]) sequenceByTriggeredID, err := mdbrepo.GetByTriggeredID("my-project", "my-triggered-id") @@ -104,6 +106,7 @@ func TestMongoDBTaskSequenceV2Repo_AppendTaskEvent(t *testing.T) { require.Nil(t, err) require.Len(t, get, 1) + get[0].SchemaVersion = "" require.Equal(t, sequence, get[0]) triggeredEvent := models.TaskEvent{ @@ -115,8 +118,8 @@ func TestMongoDBTaskSequenceV2Repo_AppendTaskEvent(t *testing.T) { require.Nil(t, err) - require.Len(t, result.Status.CurrentTask.Events, 1) - require.Equal(t, triggeredEvent, result.Status.CurrentTask.Events[0]) + require.Len(t, result.Status.CurrentTask.Events, 2) + require.Equal(t, triggeredEvent, result.Status.CurrentTask.Events[1]) } func TestMongoDBTaskSequenceV2Repo_AppendTaskEventMultipleWriters(t *testing.T) { @@ -137,6 +140,7 @@ func TestMongoDBTaskSequenceV2Repo_AppendTaskEventMultipleWriters(t *testing.T) require.Nil(t, err) require.Len(t, get, 1) + get[0].SchemaVersion = "" require.Equal(t, sequence, get[0]) triggeredEvent := models.TaskEvent{ @@ -171,7 +175,7 @@ func TestMongoDBTaskSequenceV2Repo_AppendTaskEventMultipleWriters(t *testing.T) require.Nil(t, err) require.Len(t, get, 1) - require.Len(t, get[0].Status.CurrentTask.Events, nrConcurrentWrites) + require.Len(t, get[0].Status.CurrentTask.Events, nrConcurrentWrites+1) } func TestMongoDBTaskSequenceV2Repo_UpdateStatus(t *testing.T) { @@ -192,6 +196,7 @@ func TestMongoDBTaskSequenceV2Repo_UpdateStatus(t *testing.T) { require.Nil(t, err) require.Len(t, get, 1) + get[0].SchemaVersion = "" require.Equal(t, sequence, get[0]) require.Nil(t, err) @@ -222,16 +227,29 @@ func getTestSequenceExecution() (models.EventScope, models.SequenceExecution) { Tasks: []keptnv2.Task{ { Name: "deploy", + Properties: map[string]interface{}{ + "deployment-strategy": "direct", + }, }, }, }, Status: models.SequenceExecutionStatus{ State: "triggered", - PreviousTasks: nil, + PreviousTasks: []models.TaskExecutionResult{}, CurrentTask: models.TaskExecutionState{ Name: "deploy", TriggeredID: "1234", - Events: []models.TaskEvent{}, + Events: []models.TaskEvent{ + { + EventType: "deployment.finished", + Source: "my-service", + Result: "pass", + Status: "succeeded", + Properties: map[string]interface{}{ + "deploymentURI": "my-url", + }, + }, + }, }, }, Scope: scope, diff --git a/shipyard-controller/main.go b/shipyard-controller/main.go index 62bef9a745..86efdf06fa 100644 --- a/shipyard-controller/main.go +++ b/shipyard-controller/main.go @@ -264,6 +264,14 @@ func main() { } log.Info("Finished migrating project key format") + log.Info("Migrating sequence execution format") + sequenceExecutionMigrator := migration.NewSequenceExecutionMigrator(db.GetMongoDBConnectionInstance()) + err = sequenceExecutionMigrator.Run() + if err != nil { + log.Errorf("Unable to run sequence execution migrator: %v", err) + } + log.Info("Finished migrating sequence execution format") + healthHandler := handler.NewHealthHandler() healthController := controller.NewHealthController(healthHandler) healthController.Inject(apiHealth) diff --git a/shipyard-controller/models/sequence_execution.go b/shipyard-controller/models/sequence_execution.go index 9923343f07..8bac999ca5 100644 --- a/shipyard-controller/models/sequence_execution.go +++ b/shipyard-controller/models/sequence_execution.go @@ -12,6 +12,8 @@ import ( // Also, for multiple iterations of a sequence, each iteration will get a new instance. type SequenceExecution struct { ID string `json:"_id" bson:"_id"` + // SchemaVersion indicates the scheme that is used for the internal representation of the sequence execution + SchemaVersion string `json:"schemaVersion" bson:"schemaVersion"` // Sequence contains the complete sequence definition Sequence keptnv2.Sequence `json:"sequence" bson:"sequence"` Status SequenceExecutionStatus `json:"status" bson:"status"` @@ -262,4 +264,5 @@ type SequenceExecutionFilter struct { type SequenceExecutionUpsertOptions struct { CheckUniqueTriggeredID bool + Replace bool }