Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting corrupted workflow when sending workflow with high concurrency #3884

Closed
hulucc opened this issue Jan 11, 2021 · 2 comments
Closed

Getting corrupted workflow when sending workflow with high concurrency #3884

hulucc opened this issue Jan 11, 2021 · 2 comments

Comments

@hulucc
Copy link

hulucc commented Jan 11, 2021

The list view saying its open but actually completed.

image
image

It can be easily reproduced by send 2000 workflow to cadence server concurrencyly

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/google/uuid"
	"go.uber.org/cadence/activity"
	"go.uber.org/cadence/worker"
	"go.uber.org/yarpc/transport/tchannel"

	// "go.uber.org/yarpc/api/transport"
	"go.uber.org/cadence"
	"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
	"go.uber.org/cadence/client"
	"go.uber.org/cadence/workflow"
	"go.uber.org/yarpc"
)

var cc client.Client

func init() {
	workflow.Register(SimpleWorkflow)
	workflow.Register(CronWorkflow)
	activity.Register(SimpleActivity)
}

func SimpleWorkflow(ctx workflow.Context) error {
	ao := workflow.ActivityOptions{
		ScheduleToStartTimeout: time.Minute,
		ScheduleToCloseTimeout: time.Minute,
		StartToCloseTimeout:    time.Minute,
		WaitForCancellation:    false,
		RetryPolicy: &cadence.RetryPolicy{
			InitialInterval:    time.Second,
			BackoffCoefficient: 2,
			MaximumInterval:    time.Minute,
			MaximumAttempts:    3,
			// ExpirationInterval: time.Hour,
			NonRetriableErrorReasons: []string{
				"cadenceInternal:Panic",
				"cadenceInternal:Generic",
			},
		},
	}
	ctx = workflow.WithActivityOptions(ctx, ao)
	err := workflow.ExecuteActivity(ctx, SimpleActivity).Get(ctx, nil)
	return err
}

func CronWorkflow(ctx workflow.Context) error {
	ao := workflow.ActivityOptions{
		ScheduleToStartTimeout: time.Minute,
		ScheduleToCloseTimeout: time.Minute,
		StartToCloseTimeout:    time.Minute,
		WaitForCancellation:    false,
		RetryPolicy: &cadence.RetryPolicy{
			InitialInterval:    time.Second,
			BackoffCoefficient: 2,
			MaximumInterval:    time.Minute,
			MaximumAttempts:    3,
			// ExpirationInterval: time.Hour,
			NonRetriableErrorReasons: []string{
				"cadenceInternal:Panic",
				"cadenceInternal:Generic",
			},
		},
	}
	ctx = workflow.WithActivityOptions(ctx, ao)
	workflow.ExecuteActivity(ctx, SimpleActivity).Get(ctx, nil)
	workflow.Sleep(ctx, time.Second*10)
	return workflow.NewContinueAsNewError(ctx, CronWorkflow)

}

func SimpleActivity(ctx context.Context) error {
	time.Sleep(time.Second * 1)
	println("done")
	return nil
}

func main() {
	hostPort := "cadence-frontend.cadence:7933"
	clientName := "simple-worker"
	domain := "play"
	taskList := "simple-worker"
	cadenceService := "cadence-frontend"
	ch, err := tchannel.NewChannelTransport(tchannel.ServiceName(clientName))
	if err != nil {
		panic(err)
	}
	dispatcher := yarpc.NewDispatcher(yarpc.Config{
		Name: clientName,
		Outbounds: yarpc.Outbounds{
			cadenceService: {Unary: ch.NewSingleOutbound(hostPort)},
		},
	})
	err = dispatcher.Start()
	if err != nil {
		panic(err)
	}

	service := workflowserviceclient.New(dispatcher.ClientConfig(cadenceService))
	worker := worker.New(service, domain, taskList, worker.Options{})
	cc = client.NewClient(service, domain, &client.Options{})

	retry := &cadence.RetryPolicy{
		InitialInterval:    time.Second,
		BackoffCoefficient: 2,
		MaximumInterval:    time.Minute,
		ExpirationInterval: time.Minute * 10,
		MaximumAttempts:    10,
		NonRetriableErrorReasons: []string{
			"cadenceInternal:Panic",
			"cadenceInternal:Generic",
		},
	}
	_ = retry

	start := func() {
		s := time.Now()
		wf, err := cc.StartWorkflow(context.Background(), client.StartWorkflowOptions{
			ID:                           uuid.New().String(),
			TaskList:                     taskList,
			ExecutionStartToCloseTimeout: time.Hour,
		}, SimpleWorkflow)
		if err != nil {
			panic(err)
		}
		fmt.Printf("duration: %+v, start: %v, end: %v, id: %s\n", time.Since(s).Seconds(), s.Format("15:04:05"), time.Now().Format("15:04:05"), wf.ID)
	}
	cron := func() {
		_, err := cc.StartWorkflow(context.Background(), client.StartWorkflowOptions{
			ID:                           "CronWorkflow",
			TaskList:                     taskList,
			ExecutionStartToCloseTimeout: time.Hour,
			CronSchedule:                 "* * * * *",
		}, CronWorkflow)
		if err != nil {
			panic(err)
		}
	}

	if len(os.Args) > 1 {
		switch os.Args[1] {
		case "start":
			for i := 0; i < 2000; i++ {
				go start()
			}
			select {}
		case "cron":
			cron()
			select {}
		default:
			fmt.Printf("%+v\n", "role: start/cron")
		}
	} else {
		err = worker.Run()
		if err != nil {
			panic(err)
		}
	}
}
@hulucc
Copy link
Author

hulucc commented Jan 13, 2021

It's caused by system.enableVisibilitySampling. You can disable visibility sampling and use standalone db with visibility, or use elasticsearch to avoid this problem.

@hulucc hulucc closed this as completed Jan 13, 2021
@zhangkyou
Copy link

The default value of system.enableVisibilitySampling in dynamicconfig is FALSE. I didn't write this config key in my dynamicconfig file, but I still have the problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants