Skip to content

Commit

Permalink
Fix: fix sync when restarting the application
Browse files Browse the repository at this point in the history
Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>
  • Loading branch information
FogDong committed Apr 15, 2023
1 parent 118ea99 commit a11eea3
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 46 deletions.
88 changes: 52 additions & 36 deletions pkg/server/domain/service/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,33 +423,41 @@ func (w *workflowServiceImpl) SyncWorkflowRecord(ctx context.Context, appPrimary
}
for _, item := range unfinishedRecords {
record := item.(*model.WorkflowRecord)
revision := &model.ApplicationRevision{AppPrimaryKey: appPrimaryKey, Version: record.RevisionPrimaryKey}
if err := w.Store.Get(ctx, revision); err != nil {
if errors.Is(err, datastore.ErrRecordNotExist) {
// If the application revision is not exist, the record do not need be synced
record.Finished = "true"
record.Status = model.RevisionStatusFail
if err := w.Store.Put(ctx, record); err != nil {
return fmt.Errorf(("failed to set the record status to terminated: %s"), err.Error())
}
return bcode.ErrApplicationRevisionNotExist
}
return err
}
// sync from application status
if record.Name == recordName {
if err := w.syncRecordFromApplicationStatus(ctx, app, record, workflowContext, appPrimaryKey); err != nil {
if err := w.syncRecordFromApplicationStatus(ctx, app, record, revision, workflowContext); err != nil {
klog.Errorf("failed to sync workflow record %s from application status %s", record.Name, err.Error())
}
continue
}
// sync from application revision
if err := w.syncRecordFromApplicationRevision(ctx, record); err != nil {
klog.Errorf("failed to sync workflow record %s from application revision %s", record.Name, err.Error())
if revision.RevisionCRName != "" {
// sync from application revision
if err := w.syncRecordFromApplicationRevision(ctx, record, revision); err != nil {
klog.Errorf("failed to sync workflow record %s from application revision %s", record.Name, err.Error())
}
}
}

return nil
}

func (w *workflowServiceImpl) syncRecordFromApplicationStatus(ctx context.Context, app *v1beta1.Application, record *model.WorkflowRecord, workflowContext map[string]string, appPrimaryKey string) error {
func (w *workflowServiceImpl) syncRecordFromApplicationStatus(ctx context.Context, app *v1beta1.Application, record *model.WorkflowRecord, revision *model.ApplicationRevision, workflowContext map[string]string) error {
if app == nil || app.Annotations == nil || app.Status.Workflow == nil {
return nil
}
var revision = &model.ApplicationRevision{AppPrimaryKey: appPrimaryKey, Version: record.RevisionPrimaryKey}
if err := w.Store.Get(ctx, revision); err != nil {
if errors.Is(err, datastore.ErrRecordNotExist) {
return bcode.ErrApplicationRevisionNotExist
}
return err
}

if workflowContext != nil {
record.ContextValue = workflowContext
Expand Down Expand Up @@ -489,10 +497,14 @@ func (w *workflowServiceImpl) syncRecordFromApplicationStatus(ctx context.Contex
for i, step := range record.Steps {
if s, ok := stepStatus[step.Name]; ok {
record.Steps[i].StepStatus = s
} else {
record.Steps[i].StepStatus = resetStepStatus(step.StepStatus)
}
for j, sub := range step.SubStepsStatus {
if s, ok := stepStatus[sub.Name]; ok {
record.Steps[i].SubStepsStatus[j] = s
} else {
record.Steps[i].SubStepsStatus[j] = resetStepStatus(sub)
}
}
}
Expand All @@ -513,6 +525,7 @@ func (w *workflowServiceImpl) syncRecordFromApplicationStatus(ctx context.Contex
revision.RevisionCRName = app.Status.LatestRevision.Name
}
if err := w.Store.Put(ctx, revision); err != nil {
klog.ErrorS(err, "failed to update application revision status", "revision", revision.Version)
return err
}

Expand All @@ -522,20 +535,19 @@ func (w *workflowServiceImpl) syncRecordFromApplicationStatus(ctx context.Contex
return nil
}

func (w *workflowServiceImpl) syncRecordFromApplicationRevision(ctx context.Context, record *model.WorkflowRecord) error {
var revision = &model.ApplicationRevision{AppPrimaryKey: record.AppPrimaryKey, Version: record.RevisionPrimaryKey}
if err := w.Store.Get(ctx, revision); err != nil {
if errors.Is(err, datastore.ErrRecordNotExist) {
// If the application revision is not exist, the record do not need be synced
record.Finished = "true"
record.Status = model.RevisionStatusFail
if err := w.Store.Put(ctx, record); err != nil {
return fmt.Errorf(("failed to set the record status to terminated: %s"), err.Error())
}
}
return fmt.Errorf(("failed to get the application revision from database: %s"), err.Error())
}
func resetStepStatus(status model.StepStatus) model.StepStatus {
status.Phase = ""
status.Message = ""
status.LastExecuteTime = time.Time{}
status.FirstExecuteTime = time.Time{}
return status
}

func (w *workflowServiceImpl) syncRecordFromApplicationRevision(ctx context.Context, record *model.WorkflowRecord, revision *model.ApplicationRevision) error {
var appRevision v1beta1.ApplicationRevision
if revision.RevisionCRName == "" {
return nil
}
if err := w.KubeClient.Get(ctx, types.NamespacedName{Namespace: record.Namespace, Name: revision.RevisionCRName}, &appRevision); err != nil {
if apierrors.IsNotFound(err) {
klog.Warningf("can't find the application revision %s/%s, set the record status to terminated", revision.RevisionCRName, record.Namespace)
Expand All @@ -553,21 +565,25 @@ func (w *workflowServiceImpl) syncRecordFromApplicationRevision(ctx context.Cont
appRevision.Spec.Application.Status.Workflow.Finished = true
appRevision.Spec.Application.Status.Workflow.Terminated = true
}
phase := appRevision.Spec.Application.Status.Workflow.Phase
if phase != workflowv1alpha1.WorkflowStateFailed && phase != workflowv1alpha1.WorkflowStateSucceeded {
appRevision.Spec.Application.Status.Workflow.Phase = workflowv1alpha1.WorkflowStateTerminated
}
}
return w.syncRecordFromApplicationStatus(ctx, &appRevision.Spec.Application, record, appRevision.Status.WorkflowContext, record.AppPrimaryKey)
return w.syncRecordFromApplicationStatus(ctx, &appRevision.Spec.Application, record, revision, appRevision.Status.WorkflowContext)
}

func generateRevisionStatus(phase workflowv1alpha1.WorkflowRunPhase) string {
summaryStatus := model.RevisionStatusRunning
switch {
case phase == workflowv1alpha1.WorkflowStateFailed:
summaryStatus = model.RevisionStatusFail
case phase == workflowv1alpha1.WorkflowStateSucceeded:
summaryStatus = model.RevisionStatusComplete
case phase == workflowv1alpha1.WorkflowStateTerminated:
summaryStatus = model.RevisionStatusTerminated
}
return summaryStatus
switch phase {
case workflowv1alpha1.WorkflowStateFailed:
return model.RevisionStatusFail
case workflowv1alpha1.WorkflowStateSucceeded:
return model.RevisionStatusComplete
case workflowv1alpha1.WorkflowStateTerminated:
return model.RevisionStatusTerminated
default:
return model.RevisionStatusRunning
}
}

func (w *workflowServiceImpl) CreateWorkflowRecord(ctx context.Context, appModel *model.Application, app *v1beta1.Application, workflow *model.Workflow) (*model.WorkflowRecord, error) {
Expand Down
22 changes: 12 additions & 10 deletions pkg/server/event/sync/cr2ux.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,13 @@ func (c *CR2UX) syncAppCreatedByUX(ctx context.Context, targetApp *v1beta1.Appli
if appPrimaryKey == "" {
return fmt.Errorf("appName is empty in application %s", targetApp.Name)
}
if targetApp.Annotations == nil || targetApp.Annotations[oam.AnnotationPublishVersion] == "" {
klog.Warningf("app %s/%s has no publish version, skip sync workflow status", targetApp.Namespace, targetApp.Name)
var recordName string
if targetApp.Status.Workflow != nil {
recordName = strings.Replace(targetApp.Status.Workflow.AppRevision, ":", "-", 1)
} else {
klog.Warningf("app %s/%s has no revision in status, skip sync workflow status", targetApp.Namespace, targetApp.Name)
return nil
}
recordName := targetApp.Annotations[oam.AnnotationPublishVersion]
if err := c.workflowService.SyncWorkflowRecord(ctx, appPrimaryKey, recordName, targetApp, nil); err != nil {
klog.ErrorS(err, "failed to sync workflow status", "oam app name", targetApp.Name, "workflow name", oam.GetPublishVersion(targetApp), "record name", recordName)
return err
Expand Down Expand Up @@ -157,13 +160,12 @@ func (c *CR2UX) syncAppCreatedByCLI(ctx context.Context, targetApp *v1beta1.Appl
klog.Infof("application %s/%s revision %s synced successful", targetApp.Name, targetApp.Namespace, syncedVersion)
}

recordName := oam.GetPublishVersion(targetApp)
if recordName == "" {
if targetApp.Status.Workflow != nil {
recordName = strings.Replace(targetApp.Status.Workflow.AppRevision, ":", "-", 1)
} else {
klog.Warningf("app %s/%s has no publish version or revision in status, skip sync workflow status", targetApp.Namespace, targetApp.Name)
}
var recordName string
if targetApp.Status.Workflow != nil {
recordName = strings.Replace(targetApp.Status.Workflow.AppRevision, ":", "-", 1)
} else {
klog.Warningf("app %s/%s has no revision in status, skip sync workflow status", targetApp.Namespace, targetApp.Name)
return nil
}
return c.workflowService.SyncWorkflowRecord(ctx, c.getAppMetaName(ctx, targetApp.Name, targetApp.Namespace), recordName, targetApp, nil)
}
Expand Down

0 comments on commit a11eea3

Please sign in to comment.