Skip to content

Commit

Permalink
?
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 committed Nov 3, 2023
1 parent e064512 commit cd1f846
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 0 deletions.
46 changes: 46 additions & 0 deletions tools/tdbg/commands.go
Expand Up @@ -686,3 +686,49 @@ func AdminRebuildMutableState(c *cli.Context, clientFactory ClientFactory) error
}
return nil
}

// AdminResendReplicationTasks generate replication task
func AdminResendReplicationTasks(c *cli.Context, clientFactory ClientFactory) error {
adminClient := clientFactory.AdminClient(c)

nid, err := getRequiredOption(c, FlagNamespaceID)
if err != nil {
return err
}

wid, err := getRequiredOption(c, FlagWorkflowID)
if err != nil {
return err
}
rid, err := getRequiredOption(c, FlagRunID)
if err != nil {
return err
}
version, err := getRequiredInt64Option(c, FlagEventVersion)
if err != nil {
return err
}

cluster, err := getRequiredOption(c, FlagCluster)
if err != nil {
return err
}

ctx, cancel := newContext(c)
defer cancel()

_, err = adminClient.ResendReplicationTasks(ctx, &adminservice.ResendReplicationTasksRequest{
NamespaceId: nid,
WorkflowId: wid,
RunId: rid,
RemoteCluster: cluster,
StartEventId: 0,
StartVersion: version,
})
if err != nil {
return fmt.Errorf("resend replication task failed: %s", err)
} else {
fmt.Println("resend replication task succeeded.")
}
return nil
}
1 change: 1 addition & 0 deletions tools/tdbg/flags.go
Expand Up @@ -67,6 +67,7 @@ var (
FlagLastMessageID = "last-message-id"
FlagYes = "yes"
FlagMore = "more"
FlagEventVersion = "event-version"
FlagMinEventVersion = "min-event-version"
FlagMaxEventVersion = "max-event-version"
FlagMinTaskID = "min-task-id"
Expand Down
32 changes: 32 additions & 0 deletions tools/tdbg/tdbg_commands.go
Expand Up @@ -196,6 +196,38 @@ func newAdminWorkflowCommands(clientFactory ClientFactory) []*cli.Command {
return AdminRebuildMutableState(c, clientFactory)
},
},
{
Name: "resend-replication-task",
Aliases: []string{},
Usage: "Resend replication task",
Flags: []cli.Flag{
&cli.StringFlag{
Name: FlagNamespaceID,
Usage: "Namespace ID",
},
&cli.StringFlag{
Name: FlagWorkflowID,
Aliases: FlagWorkflowIDAlias,
Usage: "Workflow ID",
},
&cli.StringFlag{
Name: FlagRunID,
Aliases: FlagRunIDAlias,
Usage: "Run ID",
},
&cli.StringFlag{
Name: FlagCluster,
Usage: "Source cluster",
},
&cli.Int64Flag{
Name: FlagEventVersion,
Usage: "event version",
},
},
Action: func(c *cli.Context) error {
return AdminResendReplicationTasks(c, clientFactory)
},
},
{
Name: "delete",
Aliases: []string{"del"},
Expand Down
7 changes: 7 additions & 0 deletions tools/tdbg/util.go
Expand Up @@ -73,6 +73,13 @@ func getRequiredOption(c *cli.Context, optionName string) (string, error) {
return value, nil
}

func getRequiredInt64Option(c *cli.Context, optionName string) (int64, error) {
if !c.IsSet(optionName) {
return 0, fmt.Errorf("option is required: %s", optionName)
}
return c.Int64(optionName), nil
}

func parseTime(timeStr string, defaultValue time.Time, now time.Time) (time.Time, error) {
if len(timeStr) == 0 {
return defaultValue, nil
Expand Down

0 comments on commit cd1f846

Please sign in to comment.