Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1 fix 332 and 333 #334

Merged
merged 21 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
06f398f
adding watcher for an operation
kevinhillinger Jun 12, 2023
e7b86c3
wiring up watcher initially to deploystage
kevinhillinger Jun 12, 2023
d0aa1b2
updating with comments
kevinhillinger Jun 12, 2023
a17df60
storing snapshot of arm template prior to deploy
kevinhillinger Jun 12, 2023
8cb6243
updating operation executor to call done on the context
kevinhillinger Jun 12, 2023
90677dd
creating new ctx for watcher
kevinhillinger Jun 12, 2023
60110a8
adding find poller
bobjac Jun 12, 2023
609973b
adding buffered channel for finduntildone
bobjac Jun 12, 2023
dce418e
ability to cancel watcher of the parent
kevinhillinger Jun 12, 2023
edb89a7
removing trace from operation manager
kevinhillinger Jun 12, 2023
f0a8285
updating channel to have a buffer of 1 in the finder
kevinhillinger Jun 12, 2023
30d4d32
pushing bicep template
kevinhillinger Jun 12, 2023
c16a3f2
push template
kevinhillinger Jun 12, 2023
1029560
adding sh script
bobjac Jun 12, 2023
e41ff0a
success with depends on template
kevinhillinger Jun 12, 2023
3782e73
Merge branch 'v1-fix/332' of https://github.com/microsoft/commercial-…
bobjac Jun 12, 2023
aee967b
cancellation token
kevinhillinger Jun 12, 2023
c10ec7c
adding deploymentScript.bicep
bobjac Jun 12, 2023
31be8c9
Merge branch 'v1-fix/332' of https://github.com/microsoft/commercial-…
bobjac Jun 12, 2023
7fc97a8
adding continuation
kevinhillinger Jun 12, 2023
0176872
Merge branch 'v1-fix/332' of github.com:microsoft/commercial-marketpl…
kevinhillinger Jun 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/operator/handlers/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ func (h *operationMessageHandler) Handle(message *messaging.ExecuteInvokedOperat

func NewOperationsMessageHandler(appConfig *config.AppConfig) *operationMessageHandler {
handler := &operationMessageHandler{}
service, err := newOperationService(appConfig)
manager, err := newOperationManager(appConfig)
if err != nil {
log.Errorf("Error creating operations message handler: %s", err)
return nil
}

repository, err := operation.NewRepository(service, operations.NewOperationFuncProvider(appConfig))
repository, err := operation.NewRepository(manager, operations.NewOperationFuncProvider(appConfig))
if err != nil {
log.Errorf("Error creating operations message handler: %s", err)
return nil
Expand All @@ -43,7 +43,7 @@ func NewOperationsMessageHandler(appConfig *config.AppConfig) *operationMessageH
return handler
}

func newOperationService(appConfig *config.AppConfig) (*operation.OperationManager, error) {
func newOperationManager(appConfig *config.AppConfig) (*operation.OperationManager, error) {
db := data.NewDatabase(appConfig.GetDatabaseOptions()).Instance()

credential, err := azidentity.NewDefaultAzureCredential(nil)
Expand Down
4 changes: 4 additions & 0 deletions cmd/operator/operations/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (op *deployOperation) do(context operation.ExecutionContext) error {
}

azureDeployment := op.mapAzureDeployment(context.Operation(), deployStageOperations)

// save the built arm template to the operation's attributes so we have a snapshot of what was submitted
context.Attribute(model.AttributeKeyArmTemplate, azureDeployment.Template)

deployer, err := op.newDeployer(azureDeployment.SubscriptionId)
if err != nil {
return err
Expand Down
101 changes: 93 additions & 8 deletions cmd/operator/operations/deploystage.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
package operations

import (
"errors"
"fmt"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/labstack/gommon/log"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/config"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/data"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/hook"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/messaging"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/model"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/model/operation"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/model/stage"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/threading"
"github.com/microsoft/commercial-marketplace-offer-deploy/sdk"
)

Expand All @@ -13,39 +23,68 @@ type nameFinderFactory func(context operation.ExecutionContext) (*operation.Azur
type deployStageOperation struct {
pollerFactory *stage.DeployStagePollerFactory
nameFinderFactory nameFinderFactory
watcher operation.OperationWatcher
}

func (op *deployStageOperation) Do(context operation.ExecutionContext) error {
finder, err := op.nameFinderFactory(context)
func (op *deployStageOperation) Do(executionContext operation.ExecutionContext) error {
token, err := op.watchParentOperation(executionContext)
if err != nil {
return err
}

finder, err := op.nameFinderFactory(executionContext)
if err != nil {
return err
}

azureDeploymentName, err := finder.Find(context.Context())
azureDeploymentName, err := finder.FindUntilDone(token.Context())
if err != nil {
return err
}

// save the deployment name to the operation so we can fetch it later
context.Operation().Attribute(model.AttributeKeyAzureDeploymentName, azureDeploymentName)
context.SaveChanges()
executionContext.Operation().Attribute(model.AttributeKeyAzureDeploymentName, azureDeploymentName)
executionContext.SaveChanges()

isFirstAttempt := context.Operation().IsFirstAttempt()
isFirstAttempt := executionContext.Operation().IsFirstAttempt()
if isFirstAttempt {
err := op.wait(context, azureDeploymentName)
err := op.wait(executionContext, azureDeploymentName)
if err != nil {
return err
}
} else { // retry the stage
retryStage := NewRetryStageOperation()
err := retryStage(context)
err := retryStage(executionContext)
if err != nil {
return err
}
}

token.Cancel()

return nil
}

// watches the parent deploy operation for failure or completed state
// it will trigger a cancellation of the ctx on the execution context if the condition is met
func (op *deployStageOperation) watchParentOperation(context operation.ExecutionContext) (threading.CancellationToken, error) {
parentId := context.Operation().ParentID
if parentId == nil {
return nil, errors.New("parent operation id is nil")
}
options := operation.OperationWatcherOptions{
Condition: func(operation model.InvokedOperation) bool {
return operation.Status == sdk.StatusFailed.String() && operation.IsCompleted()
},
Frequency: 5 * time.Second,
}
token, err := op.watcher.Watch(*parentId, options)
if err != nil {
return nil, fmt.Errorf("failed to start watcher for parent operation [%s]", *parentId)
}
return token, nil
}

func (op *deployStageOperation) wait(context operation.ExecutionContext, azureDeploymentName string) error {
poller, err := op.pollerFactory.Create(context.Operation(), azureDeploymentName, nil)
if err != nil {
Expand All @@ -68,11 +107,57 @@ func (op *deployStageOperation) wait(context operation.ExecutionContext, azureDe
func NewDeployStageOperation(appConfig *config.AppConfig) operation.OperationFunc {
pollerFactory := stage.NewDeployStagePollerFactory()

repository, err := newOperationRepository(appConfig)
if err != nil {
log.Errorf("failed construct deployStage operation: %s", err)
return nil
}

operation := &deployStageOperation{
watcher: operation.NewWatcher(repository),
pollerFactory: pollerFactory,
nameFinderFactory: func(context operation.ExecutionContext) (*operation.AzureDeploymentNameFinder, error) {
return operation.NewAzureDeploymentNameFinder(context.Operation())
},
}
return operation.Do
}

func newOperationRepository(appConfig *config.AppConfig) (operation.Repository, error) {
manager, err := newOperationManager(appConfig)
if err != nil {
return nil, err
}

repository, err := operation.NewRepository(manager, nil)
if err != nil {
return nil, err
}
return repository, nil
}

func newOperationManager(appConfig *config.AppConfig) (*operation.OperationManager, error) {
db := data.NewDatabase(appConfig.GetDatabaseOptions()).Instance()

credential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return nil, err
}

sender, err := messaging.NewServiceBusMessageSender(credential, messaging.MessageSenderOptions{
SubscriptionId: appConfig.Azure.SubscriptionId,
Location: appConfig.Azure.Location,
ResourceGroupName: appConfig.Azure.ResourceGroupName,
FullyQualifiedNamespace: appConfig.Azure.GetFullQualifiedNamespace(),
})

if err != nil {
return nil, err
}

service, err := operation.NewManager(db, sender, hook.Notify)
if err != nil {
return nil, err
}
return service, nil
}
9 changes: 9 additions & 0 deletions internal/model/invokedoperation.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ func (o *InvokedOperation) Running() {
o.setStatus(sdk.StatusRunning.String())
}

// whether the operation execution is being picked up where it last left off if interrupted
func (o *InvokedOperation) IsContinuation() bool {
if o.IsCompleted() {
return false
}

return o.IsRunning() && !o.AttemptsExceeded()
}

func (o *InvokedOperation) FirstResult() *InvokedOperationResult {
if len(o.Results) == 0 {
return o.appendResult()
Expand Down
1 change: 1 addition & 0 deletions internal/model/invokedoperationattribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
AttributeKeyCorrelationId AttributeKey = "correlationId"
AttributeKeyAzureDeploymentName AttributeKey = "azureDeploymentName"
AttributeKeyResumeToken AttributeKey = "resumeToken"
AttributeKeyArmTemplate AttributeKey = "armTemplate"
)

type InvokedOperationAttribute struct {
Expand Down
22 changes: 11 additions & 11 deletions internal/model/operation/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,30 @@ type executor struct {
}

// default implementation for executing an operation
func (exe *executor) Execute(context ExecutionContext) error {
if reasons, ok := context.Operation().IsExecutable(); !ok {
func (exe *executor) Execute(executionContext ExecutionContext) error {
if reasons, ok := executionContext.Operation().IsExecutable(); !ok {
log.Infof("Operation is not in an executable state: %s", reasons)
return nil
}

err := context.Running()
err := executionContext.Running()

if err != nil {
log.Errorf("error updating operation to running: %s", err.Error())
return err
}

do := WithLogging(exe.operation)
err = do(context)
err = do(executionContext)

if err != nil {
context.Error(err)
executionContext.Error(err)

if context.Operation().AttemptsExceeded() {
err = context.Failed()
context.Complete()
if executionContext.Operation().AttemptsExceeded() {
err = executionContext.Failed()
executionContext.Complete()
} else {
retryErr := context.Retry()
retryErr := executionContext.Retry()
if retryErr != nil {
log.Errorf("attempt to retry operation caused error: %s", retryErr.Error())
}
Expand All @@ -68,8 +68,8 @@ func (exe *executor) Execute(context ExecutionContext) error {
return err
}

context.Success()
context.Complete()
executionContext.Success()
executionContext.Complete()

return nil
}
Expand Down
40 changes: 34 additions & 6 deletions internal/model/operation/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package operation
import (
"context"
"errors"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources"
Expand All @@ -11,9 +12,14 @@ import (
log "github.com/sirupsen/logrus"
)

type FinderResponse struct {
Name string
}
type AzureDeploymentNameFinder struct {
client *armresources.DeploymentsClient
operationId uuid.UUID
ticker *time.Ticker
done chan FinderResponse
resourceGroupName string
}

Expand All @@ -34,13 +40,39 @@ func NewAzureDeploymentNameFinder(operation *Operation) (*AzureDeploymentNameFin
}
return &AzureDeploymentNameFinder{
client: client,
ticker: time.NewTicker(10 * time.Second),
done: make(chan FinderResponse, 1),
resourceGroupName: deployment.ResourceGroup,
operationId: operation.ID,
}, nil
}

func (finder *AzureDeploymentNameFinder) Find(ctx context.Context) (string, error) {
return finder.getName(ctx)
func (finder *AzureDeploymentNameFinder) FindUntilDone(ctx context.Context) (string, error) {
for {
select {
case <-finder.ticker.C:
log.Tracef("Finding deployment name for operationId: %s", finder.operationId)
name, err := finder.getName(ctx)
if err != nil {
log.Errorf("Failed to find deployment name for operationId: %s", finder.operationId)
}
if len(name) > 0 {
finder.done <- FinderResponse{
Name: name,
}
}
case response := <-finder.done:
finder.ticker.Stop()
log.WithFields(log.Fields{
"operationId": finder.operationId,
"name": response.Name,
}).Trace("name finder done")
return response.Name, nil
case <-ctx.Done():
finder.ticker.Stop()
return "", ctx.Err()
}
}
}

// get by correlationId
Expand Down Expand Up @@ -82,9 +114,5 @@ func (finder *AzureDeploymentNameFinder) getName(ctx context.Context) (string, e
}
}

if name == "" {
log.WithField("operationId", finder.operationId).Warn("Failed to find deployment by operationId")
return name, errors.New("failed to find deployment by operationId")
}
return name, nil
}
7 changes: 6 additions & 1 deletion internal/model/operation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ func (service *OperationManager) new(i *model.InvokedOperation) (*model.InvokedO
return i, nil
}

func (service *OperationManager) any(id uuid.UUID) bool {
var count int64
service.db.Model(&model.InvokedOperation{}).Where("id = ?", id).Count(&count)
return count > 0
}

// initializes and returns the single instance of InvokedOperation by the context's id
// if the id is invalid and an instance cannot be found, returns an error
func (service *OperationManager) initialize(id uuid.UUID) (*Operation, error) {
Expand All @@ -150,7 +156,6 @@ func (service *OperationManager) initialize(id uuid.UUID) (*Operation, error) {
manager: service,
}

service.log.Trace("operation service initialized.")
return service.operation, nil
}

Expand Down
5 changes: 5 additions & 0 deletions internal/model/operation/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type OperationFuncProvider interface {
type Repository interface {
New(operationType sdk.OperationType, configure Configure) (*Operation, error)
First(id uuid.UUID) (*Operation, error)
Any(id uuid.UUID) bool
Provider(provider OperationFuncProvider) error
WithContext(ctx context.Context)
}
Expand Down Expand Up @@ -93,6 +94,10 @@ func (repo *repository) First(id uuid.UUID) (*Operation, error) {
return operation, nil
}

func (repo *repository) Any(id uuid.UUID) bool {
return repo.manager.any(id)
}

// NewRepository creates a new operation repository
// appConfig: application configuration
// provider: operation function provider, optional if the operation is not going to be executed and you want to interact with the operation
Expand Down
Loading