Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: fix sync when restarting the application #775

Merged
merged 1 commit into from
Apr 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 52 additions & 36 deletions pkg/server/domain/service/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,33 +423,41 @@ func (w *workflowServiceImpl) SyncWorkflowRecord(ctx context.Context, appPrimary
}
for _, item := range unfinishedRecords {
record := item.(*model.WorkflowRecord)
revision := &model.ApplicationRevision{AppPrimaryKey: appPrimaryKey, Version: record.RevisionPrimaryKey}
if err := w.Store.Get(ctx, revision); err != nil {
if errors.Is(err, datastore.ErrRecordNotExist) {
// If the application revision is not exist, the record do not need be synced
record.Finished = "true"
record.Status = model.RevisionStatusFail
if err := w.Store.Put(ctx, record); err != nil {
return fmt.Errorf(("failed to set the record status to terminated: %s"), err.Error())
}
return bcode.ErrApplicationRevisionNotExist
}
return err
}
// sync from application status
if record.Name == recordName {
if err := w.syncRecordFromApplicationStatus(ctx, app, record, workflowContext, appPrimaryKey); err != nil {
if err := w.syncRecordFromApplicationStatus(ctx, app, record, revision, workflowContext); err != nil {
klog.Errorf("failed to sync workflow record %s from application status %s", record.Name, err.Error())
}
continue
}
// sync from application revision
if err := w.syncRecordFromApplicationRevision(ctx, record); err != nil {
klog.Errorf("failed to sync workflow record %s from application revision %s", record.Name, err.Error())
if revision.RevisionCRName != "" {
// sync from application revision
if err := w.syncRecordFromApplicationRevision(ctx, record, revision); err != nil {
klog.Errorf("failed to sync workflow record %s from application revision %s", record.Name, err.Error())
}
}
}

return nil
}

func (w *workflowServiceImpl) syncRecordFromApplicationStatus(ctx context.Context, app *v1beta1.Application, record *model.WorkflowRecord, workflowContext map[string]string, appPrimaryKey string) error {
func (w *workflowServiceImpl) syncRecordFromApplicationStatus(ctx context.Context, app *v1beta1.Application, record *model.WorkflowRecord, revision *model.ApplicationRevision, workflowContext map[string]string) error {
if app == nil || app.Annotations == nil || app.Status.Workflow == nil {
return nil
}
var revision = &model.ApplicationRevision{AppPrimaryKey: appPrimaryKey, Version: record.RevisionPrimaryKey}
if err := w.Store.Get(ctx, revision); err != nil {
if errors.Is(err, datastore.ErrRecordNotExist) {
return bcode.ErrApplicationRevisionNotExist
}
return err
}

if workflowContext != nil {
record.ContextValue = workflowContext
Expand Down Expand Up @@ -489,10 +497,14 @@ func (w *workflowServiceImpl) syncRecordFromApplicationStatus(ctx context.Contex
for i, step := range record.Steps {
if s, ok := stepStatus[step.Name]; ok {
record.Steps[i].StepStatus = s
} else {
record.Steps[i].StepStatus = resetStepStatus(step.StepStatus)
}
for j, sub := range step.SubStepsStatus {
if s, ok := stepStatus[sub.Name]; ok {
record.Steps[i].SubStepsStatus[j] = s
} else {
record.Steps[i].SubStepsStatus[j] = resetStepStatus(sub)
}
}
}
Expand All @@ -513,6 +525,7 @@ func (w *workflowServiceImpl) syncRecordFromApplicationStatus(ctx context.Contex
revision.RevisionCRName = app.Status.LatestRevision.Name
}
if err := w.Store.Put(ctx, revision); err != nil {
klog.ErrorS(err, "failed to update application revision status", "revision", revision.Version)
return err
}

Expand All @@ -522,20 +535,19 @@ func (w *workflowServiceImpl) syncRecordFromApplicationStatus(ctx context.Contex
return nil
}

func (w *workflowServiceImpl) syncRecordFromApplicationRevision(ctx context.Context, record *model.WorkflowRecord) error {
var revision = &model.ApplicationRevision{AppPrimaryKey: record.AppPrimaryKey, Version: record.RevisionPrimaryKey}
if err := w.Store.Get(ctx, revision); err != nil {
if errors.Is(err, datastore.ErrRecordNotExist) {
// If the application revision is not exist, the record do not need be synced
record.Finished = "true"
record.Status = model.RevisionStatusFail
if err := w.Store.Put(ctx, record); err != nil {
return fmt.Errorf(("failed to set the record status to terminated: %s"), err.Error())
}
}
return fmt.Errorf(("failed to get the application revision from database: %s"), err.Error())
}
func resetStepStatus(status model.StepStatus) model.StepStatus {
status.Phase = ""
status.Message = ""
status.LastExecuteTime = time.Time{}
status.FirstExecuteTime = time.Time{}
return status
}

func (w *workflowServiceImpl) syncRecordFromApplicationRevision(ctx context.Context, record *model.WorkflowRecord, revision *model.ApplicationRevision) error {
var appRevision v1beta1.ApplicationRevision
if revision.RevisionCRName == "" {
return nil
}
if err := w.KubeClient.Get(ctx, types.NamespacedName{Namespace: record.Namespace, Name: revision.RevisionCRName}, &appRevision); err != nil {
if apierrors.IsNotFound(err) {
klog.Warningf("can't find the application revision %s/%s, set the record status to terminated", revision.RevisionCRName, record.Namespace)
Expand All @@ -553,21 +565,25 @@ func (w *workflowServiceImpl) syncRecordFromApplicationRevision(ctx context.Cont
appRevision.Spec.Application.Status.Workflow.Finished = true
appRevision.Spec.Application.Status.Workflow.Terminated = true
}
phase := appRevision.Spec.Application.Status.Workflow.Phase
if phase != workflowv1alpha1.WorkflowStateFailed && phase != workflowv1alpha1.WorkflowStateSucceeded {
appRevision.Spec.Application.Status.Workflow.Phase = workflowv1alpha1.WorkflowStateTerminated
}
}
return w.syncRecordFromApplicationStatus(ctx, &appRevision.Spec.Application, record, appRevision.Status.WorkflowContext, record.AppPrimaryKey)
return w.syncRecordFromApplicationStatus(ctx, &appRevision.Spec.Application, record, revision, appRevision.Status.WorkflowContext)
}

func generateRevisionStatus(phase workflowv1alpha1.WorkflowRunPhase) string {
summaryStatus := model.RevisionStatusRunning
switch {
case phase == workflowv1alpha1.WorkflowStateFailed:
summaryStatus = model.RevisionStatusFail
case phase == workflowv1alpha1.WorkflowStateSucceeded:
summaryStatus = model.RevisionStatusComplete
case phase == workflowv1alpha1.WorkflowStateTerminated:
summaryStatus = model.RevisionStatusTerminated
}
return summaryStatus
switch phase {
case workflowv1alpha1.WorkflowStateFailed:
return model.RevisionStatusFail
case workflowv1alpha1.WorkflowStateSucceeded:
return model.RevisionStatusComplete
case workflowv1alpha1.WorkflowStateTerminated:
return model.RevisionStatusTerminated
default:
return model.RevisionStatusRunning
}
}

func (w *workflowServiceImpl) CreateWorkflowRecord(ctx context.Context, appModel *model.Application, app *v1beta1.Application, workflow *model.Workflow) (*model.WorkflowRecord, error) {
Expand Down
22 changes: 12 additions & 10 deletions pkg/server/event/sync/cr2ux.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,13 @@ func (c *CR2UX) syncAppCreatedByUX(ctx context.Context, targetApp *v1beta1.Appli
if appPrimaryKey == "" {
return fmt.Errorf("appName is empty in application %s", targetApp.Name)
}
if targetApp.Annotations == nil || targetApp.Annotations[oam.AnnotationPublishVersion] == "" {
klog.Warningf("app %s/%s has no publish version, skip sync workflow status", targetApp.Namespace, targetApp.Name)
var recordName string
if targetApp.Status.Workflow != nil {
recordName = strings.Replace(targetApp.Status.Workflow.AppRevision, ":", "-", 1)
} else {
klog.Warningf("app %s/%s has no revision in status, skip sync workflow status", targetApp.Namespace, targetApp.Name)
return nil
}
recordName := targetApp.Annotations[oam.AnnotationPublishVersion]
if err := c.workflowService.SyncWorkflowRecord(ctx, appPrimaryKey, recordName, targetApp, nil); err != nil {
klog.ErrorS(err, "failed to sync workflow status", "oam app name", targetApp.Name, "workflow name", oam.GetPublishVersion(targetApp), "record name", recordName)
return err
Expand Down Expand Up @@ -157,13 +160,12 @@ func (c *CR2UX) syncAppCreatedByCLI(ctx context.Context, targetApp *v1beta1.Appl
klog.Infof("application %s/%s revision %s synced successful", targetApp.Name, targetApp.Namespace, syncedVersion)
}

recordName := oam.GetPublishVersion(targetApp)
if recordName == "" {
if targetApp.Status.Workflow != nil {
recordName = strings.Replace(targetApp.Status.Workflow.AppRevision, ":", "-", 1)
} else {
klog.Warningf("app %s/%s has no publish version or revision in status, skip sync workflow status", targetApp.Namespace, targetApp.Name)
}
var recordName string
if targetApp.Status.Workflow != nil {
recordName = strings.Replace(targetApp.Status.Workflow.AppRevision, ":", "-", 1)
} else {
klog.Warningf("app %s/%s has no revision in status, skip sync workflow status", targetApp.Namespace, targetApp.Name)
return nil
}
return c.workflowService.SyncWorkflowRecord(ctx, c.getAppMetaName(ctx, targetApp.Name, targetApp.Namespace), recordName, targetApp, nil)
}
Expand Down