Skip to content

Commit

Permalink
v1 fix 332 and 333 (#334)
Browse files Browse the repository at this point in the history
* adding watcher for an operation

* wiring up watcher initially to deploystage

* updating with comments

* storing snapshot of arm template prior to deploy

* updating operation executor to call done on the context

* creating new ctx for watcher

* adding find poller

* adding buffered channel for finduntildone

* ability to cancel watcher of the parent

* removing trace from operation manager

* updating channel to have a buffer of 1 in the finder

* pushing bicep template

* push template

* adding sh script

* success with depends on template

* cancellation token

* adding deploymentScript.bicep

* adding continuation

---------

Co-authored-by: Bob Jacobs <bob.jacobs@outlook.com>
  • Loading branch information
kevinhillinger and bobjac committed Jun 12, 2023
1 parent c7f4be2 commit a9d0875
Show file tree
Hide file tree
Showing 18 changed files with 509 additions and 34 deletions.
6 changes: 3 additions & 3 deletions cmd/operator/handlers/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ func (h *operationMessageHandler) Handle(message *messaging.ExecuteInvokedOperat

func NewOperationsMessageHandler(appConfig *config.AppConfig) *operationMessageHandler {
handler := &operationMessageHandler{}
service, err := newOperationService(appConfig)
manager, err := newOperationManager(appConfig)
if err != nil {
log.Errorf("Error creating operations message handler: %s", err)
return nil
}

repository, err := operation.NewRepository(service, operations.NewOperationFuncProvider(appConfig))
repository, err := operation.NewRepository(manager, operations.NewOperationFuncProvider(appConfig))
if err != nil {
log.Errorf("Error creating operations message handler: %s", err)
return nil
Expand All @@ -43,7 +43,7 @@ func NewOperationsMessageHandler(appConfig *config.AppConfig) *operationMessageH
return handler
}

func newOperationService(appConfig *config.AppConfig) (*operation.OperationManager, error) {
func newOperationManager(appConfig *config.AppConfig) (*operation.OperationManager, error) {
db := data.NewDatabase(appConfig.GetDatabaseOptions()).Instance()

credential, err := azidentity.NewDefaultAzureCredential(nil)
Expand Down
4 changes: 4 additions & 0 deletions cmd/operator/operations/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (op *deployOperation) do(context operation.ExecutionContext) error {
}

azureDeployment := op.mapAzureDeployment(context.Operation(), deployStageOperations)

// save the built arm template to the operation's attributes so we have a snapshot of what was submitted
context.Attribute(model.AttributeKeyArmTemplate, azureDeployment.Template)

deployer, err := op.newDeployer(azureDeployment.SubscriptionId)
if err != nil {
return err
Expand Down
101 changes: 93 additions & 8 deletions cmd/operator/operations/deploystage.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
package operations

import (
"errors"
"fmt"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/labstack/gommon/log"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/config"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/data"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/hook"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/messaging"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/model"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/model/operation"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/model/stage"
"github.com/microsoft/commercial-marketplace-offer-deploy/internal/threading"
"github.com/microsoft/commercial-marketplace-offer-deploy/sdk"
)

Expand All @@ -13,39 +23,68 @@ type nameFinderFactory func(context operation.ExecutionContext) (*operation.Azur
type deployStageOperation struct {
pollerFactory *stage.DeployStagePollerFactory
nameFinderFactory nameFinderFactory
watcher operation.OperationWatcher
}

func (op *deployStageOperation) Do(context operation.ExecutionContext) error {
finder, err := op.nameFinderFactory(context)
func (op *deployStageOperation) Do(executionContext operation.ExecutionContext) error {
token, err := op.watchParentOperation(executionContext)
if err != nil {
return err
}

finder, err := op.nameFinderFactory(executionContext)
if err != nil {
return err
}

azureDeploymentName, err := finder.Find(context.Context())
azureDeploymentName, err := finder.FindUntilDone(token.Context())
if err != nil {
return err
}

// save the deployment name to the operation so we can fetch it later
context.Operation().Attribute(model.AttributeKeyAzureDeploymentName, azureDeploymentName)
context.SaveChanges()
executionContext.Operation().Attribute(model.AttributeKeyAzureDeploymentName, azureDeploymentName)
executionContext.SaveChanges()

isFirstAttempt := context.Operation().IsFirstAttempt()
isFirstAttempt := executionContext.Operation().IsFirstAttempt()
if isFirstAttempt {
err := op.wait(context, azureDeploymentName)
err := op.wait(executionContext, azureDeploymentName)
if err != nil {
return err
}
} else { // retry the stage
retryStage := NewRetryStageOperation()
err := retryStage(context)
err := retryStage(executionContext)
if err != nil {
return err
}
}

token.Cancel()

return nil
}

// watches the parent deploy operation for failure or completed state
// it will trigger a cancellation of the ctx on the execution context if the condition is met
func (op *deployStageOperation) watchParentOperation(context operation.ExecutionContext) (threading.CancellationToken, error) {
parentId := context.Operation().ParentID
if parentId == nil {
return nil, errors.New("parent operation id is nil")
}
options := operation.OperationWatcherOptions{
Condition: func(operation model.InvokedOperation) bool {
return operation.Status == sdk.StatusFailed.String() && operation.IsCompleted()
},
Frequency: 5 * time.Second,
}
token, err := op.watcher.Watch(*parentId, options)
if err != nil {
return nil, fmt.Errorf("failed to start watcher for parent operation [%s]", *parentId)
}
return token, nil
}

func (op *deployStageOperation) wait(context operation.ExecutionContext, azureDeploymentName string) error {
poller, err := op.pollerFactory.Create(context.Operation(), azureDeploymentName, nil)
if err != nil {
Expand All @@ -68,11 +107,57 @@ func (op *deployStageOperation) wait(context operation.ExecutionContext, azureDe
func NewDeployStageOperation(appConfig *config.AppConfig) operation.OperationFunc {
pollerFactory := stage.NewDeployStagePollerFactory()

repository, err := newOperationRepository(appConfig)
if err != nil {
log.Errorf("failed construct deployStage operation: %s", err)
return nil
}

operation := &deployStageOperation{
watcher: operation.NewWatcher(repository),
pollerFactory: pollerFactory,
nameFinderFactory: func(context operation.ExecutionContext) (*operation.AzureDeploymentNameFinder, error) {
return operation.NewAzureDeploymentNameFinder(context.Operation())
},
}
return operation.Do
}

func newOperationRepository(appConfig *config.AppConfig) (operation.Repository, error) {
manager, err := newOperationManager(appConfig)
if err != nil {
return nil, err
}

repository, err := operation.NewRepository(manager, nil)
if err != nil {
return nil, err
}
return repository, nil
}

func newOperationManager(appConfig *config.AppConfig) (*operation.OperationManager, error) {
db := data.NewDatabase(appConfig.GetDatabaseOptions()).Instance()

credential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return nil, err
}

sender, err := messaging.NewServiceBusMessageSender(credential, messaging.MessageSenderOptions{
SubscriptionId: appConfig.Azure.SubscriptionId,
Location: appConfig.Azure.Location,
ResourceGroupName: appConfig.Azure.ResourceGroupName,
FullyQualifiedNamespace: appConfig.Azure.GetFullQualifiedNamespace(),
})

if err != nil {
return nil, err
}

service, err := operation.NewManager(db, sender, hook.Notify)
if err != nil {
return nil, err
}
return service, nil
}
9 changes: 9 additions & 0 deletions internal/model/invokedoperation.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ func (o *InvokedOperation) Running() {
o.setStatus(sdk.StatusRunning.String())
}

// whether the operation execution is being picked up where it last left off if interrupted
func (o *InvokedOperation) IsContinuation() bool {
if o.IsCompleted() {
return false
}

return o.IsRunning() && !o.AttemptsExceeded()
}

func (o *InvokedOperation) FirstResult() *InvokedOperationResult {
if len(o.Results) == 0 {
return o.appendResult()
Expand Down
1 change: 1 addition & 0 deletions internal/model/invokedoperationattribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
AttributeKeyCorrelationId AttributeKey = "correlationId"
AttributeKeyAzureDeploymentName AttributeKey = "azureDeploymentName"
AttributeKeyResumeToken AttributeKey = "resumeToken"
AttributeKeyArmTemplate AttributeKey = "armTemplate"
)

type InvokedOperationAttribute struct {
Expand Down
22 changes: 11 additions & 11 deletions internal/model/operation/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,30 @@ type executor struct {
}

// default implementation for executing an operation
func (exe *executor) Execute(context ExecutionContext) error {
if reasons, ok := context.Operation().IsExecutable(); !ok {
func (exe *executor) Execute(executionContext ExecutionContext) error {
if reasons, ok := executionContext.Operation().IsExecutable(); !ok {
log.Infof("Operation is not in an executable state: %s", reasons)
return nil
}

err := context.Running()
err := executionContext.Running()

if err != nil {
log.Errorf("error updating operation to running: %s", err.Error())
return err
}

do := WithLogging(exe.operation)
err = do(context)
err = do(executionContext)

if err != nil {
context.Error(err)
executionContext.Error(err)

if context.Operation().AttemptsExceeded() {
err = context.Failed()
context.Complete()
if executionContext.Operation().AttemptsExceeded() {
err = executionContext.Failed()
executionContext.Complete()
} else {
retryErr := context.Retry()
retryErr := executionContext.Retry()
if retryErr != nil {
log.Errorf("attempt to retry operation caused error: %s", retryErr.Error())
}
Expand All @@ -68,8 +68,8 @@ func (exe *executor) Execute(context ExecutionContext) error {
return err
}

context.Success()
context.Complete()
executionContext.Success()
executionContext.Complete()

return nil
}
Expand Down
40 changes: 34 additions & 6 deletions internal/model/operation/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package operation
import (
"context"
"errors"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources"
Expand All @@ -11,9 +12,14 @@ import (
log "github.com/sirupsen/logrus"
)

type FinderResponse struct {
Name string
}
type AzureDeploymentNameFinder struct {
client *armresources.DeploymentsClient
operationId uuid.UUID
ticker *time.Ticker
done chan FinderResponse
resourceGroupName string
}

Expand All @@ -34,13 +40,39 @@ func NewAzureDeploymentNameFinder(operation *Operation) (*AzureDeploymentNameFin
}
return &AzureDeploymentNameFinder{
client: client,
ticker: time.NewTicker(10 * time.Second),
done: make(chan FinderResponse, 1),
resourceGroupName: deployment.ResourceGroup,
operationId: operation.ID,
}, nil
}

func (finder *AzureDeploymentNameFinder) Find(ctx context.Context) (string, error) {
return finder.getName(ctx)
func (finder *AzureDeploymentNameFinder) FindUntilDone(ctx context.Context) (string, error) {
for {
select {
case <-finder.ticker.C:
log.Tracef("Finding deployment name for operationId: %s", finder.operationId)
name, err := finder.getName(ctx)
if err != nil {
log.Errorf("Failed to find deployment name for operationId: %s", finder.operationId)
}
if len(name) > 0 {
finder.done <- FinderResponse{
Name: name,
}
}
case response := <-finder.done:
finder.ticker.Stop()
log.WithFields(log.Fields{
"operationId": finder.operationId,
"name": response.Name,
}).Trace("name finder done")
return response.Name, nil
case <-ctx.Done():
finder.ticker.Stop()
return "", ctx.Err()
}
}
}

// get by correlationId
Expand Down Expand Up @@ -82,9 +114,5 @@ func (finder *AzureDeploymentNameFinder) getName(ctx context.Context) (string, e
}
}

if name == "" {
log.WithField("operationId", finder.operationId).Warn("Failed to find deployment by operationId")
return name, errors.New("failed to find deployment by operationId")
}
return name, nil
}
7 changes: 6 additions & 1 deletion internal/model/operation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ func (service *OperationManager) new(i *model.InvokedOperation) (*model.InvokedO
return i, nil
}

func (service *OperationManager) any(id uuid.UUID) bool {
var count int64
service.db.Model(&model.InvokedOperation{}).Where("id = ?", id).Count(&count)
return count > 0
}

// initializes and returns the single instance of InvokedOperation by the context's id
// if the id is invalid and an instance cannot be found, returns an error
func (service *OperationManager) initialize(id uuid.UUID) (*Operation, error) {
Expand All @@ -150,7 +156,6 @@ func (service *OperationManager) initialize(id uuid.UUID) (*Operation, error) {
manager: service,
}

service.log.Trace("operation service initialized.")
return service.operation, nil
}

Expand Down
5 changes: 5 additions & 0 deletions internal/model/operation/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type OperationFuncProvider interface {
type Repository interface {
New(operationType sdk.OperationType, configure Configure) (*Operation, error)
First(id uuid.UUID) (*Operation, error)
Any(id uuid.UUID) bool
Provider(provider OperationFuncProvider) error
WithContext(ctx context.Context)
}
Expand Down Expand Up @@ -93,6 +94,10 @@ func (repo *repository) First(id uuid.UUID) (*Operation, error) {
return operation, nil
}

func (repo *repository) Any(id uuid.UUID) bool {
return repo.manager.any(id)
}

// NewRepository creates a new operation repository
// appConfig: application configuration
// provider: operation function provider, optional if the operation is not going to be executed and you want to interact with the operation
Expand Down
Loading

0 comments on commit a9d0875

Please sign in to comment.