From cd1f8469d5efe05cfa7ddc38a0e75057fc37e3e5 Mon Sep 17 00:00:00 2001 From: wenquan xing Date: Fri, 13 Oct 2023 15:40:03 -0700 Subject: [PATCH] ? --- tools/tdbg/commands.go | 46 +++++++++++++++++++++++++++++++++++++ tools/tdbg/flags.go | 1 + tools/tdbg/tdbg_commands.go | 32 ++++++++++++++++++++++++++ tools/tdbg/util.go | 7 ++++++ 4 files changed, 86 insertions(+) diff --git a/tools/tdbg/commands.go b/tools/tdbg/commands.go index d5796f8f55d..f7d64d41c4f 100644 --- a/tools/tdbg/commands.go +++ b/tools/tdbg/commands.go @@ -686,3 +686,49 @@ func AdminRebuildMutableState(c *cli.Context, clientFactory ClientFactory) error } return nil } + +// AdminResendReplicationTasks generate replication task +func AdminResendReplicationTasks(c *cli.Context, clientFactory ClientFactory) error { + adminClient := clientFactory.AdminClient(c) + + nid, err := getRequiredOption(c, FlagNamespaceID) + if err != nil { + return err + } + + wid, err := getRequiredOption(c, FlagWorkflowID) + if err != nil { + return err + } + rid, err := getRequiredOption(c, FlagRunID) + if err != nil { + return err + } + version, err := getRequiredInt64Option(c, FlagEventVersion) + if err != nil { + return err + } + + cluster, err := getRequiredOption(c, FlagCluster) + if err != nil { + return err + } + + ctx, cancel := newContext(c) + defer cancel() + + _, err = adminClient.ResendReplicationTasks(ctx, &adminservice.ResendReplicationTasksRequest{ + NamespaceId: nid, + WorkflowId: wid, + RunId: rid, + RemoteCluster: cluster, + StartEventId: 0, + StartVersion: version, + }) + if err != nil { + return fmt.Errorf("resend replication task failed: %s", err) + } else { + fmt.Println("resend replication task succeeded.") + } + return nil +} diff --git a/tools/tdbg/flags.go b/tools/tdbg/flags.go index 774124bf96e..5b41f71b42b 100644 --- a/tools/tdbg/flags.go +++ b/tools/tdbg/flags.go @@ -67,6 +67,7 @@ var ( FlagLastMessageID = "last-message-id" FlagYes = "yes" FlagMore = "more" + FlagEventVersion = "event-version" FlagMinEventVersion = "min-event-version" FlagMaxEventVersion = "max-event-version" FlagMinTaskID = "min-task-id" diff --git a/tools/tdbg/tdbg_commands.go b/tools/tdbg/tdbg_commands.go index 400bd11380b..e46ee8322fb 100644 --- a/tools/tdbg/tdbg_commands.go +++ b/tools/tdbg/tdbg_commands.go @@ -196,6 +196,38 @@ func newAdminWorkflowCommands(clientFactory ClientFactory) []*cli.Command { return AdminRebuildMutableState(c, clientFactory) }, }, + { + Name: "resend-replication-task", + Aliases: []string{}, + Usage: "Resend replication task", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: FlagNamespaceID, + Usage: "Namespace ID", + }, + &cli.StringFlag{ + Name: FlagWorkflowID, + Aliases: FlagWorkflowIDAlias, + Usage: "Workflow ID", + }, + &cli.StringFlag{ + Name: FlagRunID, + Aliases: FlagRunIDAlias, + Usage: "Run ID", + }, + &cli.StringFlag{ + Name: FlagCluster, + Usage: "Source cluster", + }, + &cli.Int64Flag{ + Name: FlagEventVersion, + Usage: "event version", + }, + }, + Action: func(c *cli.Context) error { + return AdminResendReplicationTasks(c, clientFactory) + }, + }, { Name: "delete", Aliases: []string{"del"}, diff --git a/tools/tdbg/util.go b/tools/tdbg/util.go index b7b406067b3..02be650dba2 100644 --- a/tools/tdbg/util.go +++ b/tools/tdbg/util.go @@ -73,6 +73,13 @@ func getRequiredOption(c *cli.Context, optionName string) (string, error) { return value, nil } +func getRequiredInt64Option(c *cli.Context, optionName string) (int64, error) { + if !c.IsSet(optionName) { + return 0, fmt.Errorf("option is required: %s", optionName) + } + return c.Int64(optionName), nil +} + func parseTime(timeStr string, defaultValue time.Time, now time.Time) (time.Time, error) { if len(timeStr) == 0 { return defaultValue, nil