Skip to content

Commit

Permalink
rpk: add 'cluster txn' commands
Browse files Browse the repository at this point in the history
Added:

rpk cluster txn
rpk cluster txn list
rpk cluster txn describe
rpk cluster txn describe-producers
rpk cluster txn find-hanging
rpk cluster txn abort

For #7308.
  • Loading branch information
twmb committed Jul 7, 2023
1 parent 06ce6d3 commit 7c9e37c
Show file tree
Hide file tree
Showing 7 changed files with 649 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/go/rpk/pkg/cli/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/partitions"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/selftest"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/storage"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/txn"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/group"
pkgconfig "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/spf13/afero"
Expand Down Expand Up @@ -45,6 +46,7 @@ func NewCommand(fs afero.Fs, p *pkgconfig.Params) *cobra.Command {
partitions.NewPartitionsCommand(fs, p),
selftest.NewSelfTestCommand(fs, p),
storage.NewCommand(fs, p),
txn.NewCommand(fs, p),
offsets,
)

Expand Down
94 changes: 94 additions & 0 deletions src/go/rpk/pkg/cli/cluster/txn/abort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2022 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package txn

import (
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/twmb/franz-go/pkg/kadm"
)

func newAbortCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var (
topic string
partition int32
startOffset int64
)
cmd := &cobra.Command{
Use: "abort",
Short: "Abort an open transaction for a given partition",
Long: `Abort an open transaction for a given partition.
This command aborts a single partition in a transaction, allowing the
last-stable-offset to advance for the partition. To be extra cautious, this
command only aborts one partition at a time. You can run this repeatedly to
abort all partitions that may be hanging in a single transaction.
You can find potentially hanging transactions with the find-hanging command.
`,

Run: func(cmd *cobra.Command, txnIDs []string) {
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "unable to load config: %v", err)

adm, err := kafka.NewAdmin(fs, p)
out.MaybeDie(err, "unable to initialize kafka client: %v", err)
defer adm.Close()

var s kadm.TopicsSet
s.Add(topic, partition)
producers, err := adm.DescribeProducers(cmd.Context(), s)
out.HandleShardError("DescribeProducers", err)

var hung kadm.DescribedProducer
var found bool
producers.EachProducer(func(p kadm.DescribedProducer) {
if p.CurrentTxnStartOffset == startOffset {
hung = p
found = true
return
}
})
if !found {
out.Die("unable to find a hung transaction at start offset %d for topic %s partition %d", startOffset, topic, partition)
}

resp, err := adm.WriteTxnMarkers(cmd.Context(), kadm.TxnMarkers{
ProducerID: hung.ProducerID,
ProducerEpoch: hung.ProducerEpoch,
Commit: false,
CoordinatorEpoch: hung.CoordinatorEpoch,
Topics: s,
})
out.HandleShardError("WriteTxnMarkers", err)

tw := out.NewTable("topic", "partition", "producer-id", "error")
defer tw.Flush()
resp.EachPartition(func(p kadm.TxnMarkersPartitionResponse) {
tw.PrintStructFields(struct {
Topic string
Partition int32
ProducerID int64
Err error
}{p.Topic, p.Partition, p.ProducerID, p.Err})
})
},
}
cmd.Flags().StringVarP(&topic, "topic", "t", "", "Topic to abort a transaction for")
cmd.Flags().Int32VarP(&partition, "partition", "p", -1, "Partition to abort a transaction for")
cmd.Flags().Int64VarP(&startOffset, "start-offset", "o", -1, "Transaction start offset that the transaction you are aborting is hung at")
cobra.MarkFlagRequired(cmd.Flags(), "topic")
cobra.MarkFlagRequired(cmd.Flags(), "partition")
cobra.MarkFlagRequired(cmd.Flags(), "start-offset")
return cmd
}
123 changes: 123 additions & 0 deletions src/go/rpk/pkg/cli/cluster/txn/describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2022 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package txn

import (
"fmt"
"strconv"
"strings"
"time"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/twmb/franz-go/pkg/kadm"
)

func newDescribeCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var format string
cmd := &cobra.Command{
Use: "describe [TXN-IDS...]",
Short: "Describe transactional IDs",
Long: `Describe transactional IDs.
This command, in comparison to 'list', is a more detailed per-transaction view
of transactional IDs. In addition to the state and producer ID, this command
also outputs when a transaction started, the epoch of the producer ID, how long
until the transaction times out, and the partitions currently a part of the
transaction. For information on what the columns in the output mean, see
'rpk cluster txn --help'.
By default, all topics in a transaction are merged into one line. To print a
row per topic, use --format=long. To include partitions with topics, use
--print-partitions,
If no transactional IDs are requested, all transactional IDs are printed.
`,

Run: func(cmd *cobra.Command, txnIDs []string) {
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "unable to load config: %v", err)

adm, err := kafka.NewAdmin(fs, p)
out.MaybeDie(err, "unable to initialize kafka client: %v", err)
defer adm.Close()

described, err := adm.DescribeTransactions(cmd.Context(), txnIDs...)
out.HandleShardError("DescribeTransactions", err)

headers := []string{
"coordinator",
"transactional-id",
"producer-id",
"producer-epoch",
"state",
"start-timestamp",
"timeout",
}
common := func(x kadm.DescribedTransaction) []interface{} {
return out.StructFields(struct {
Coordinator int32
TxnID string
ProducerID int64
ProducerEpoch int16
State string
StartTimestamp string
Timeout time.Duration
}{
x.Coordinator,
x.TxnID,
x.ProducerID,
x.ProducerEpoch,
x.State,
time.UnixMilli(x.StartTimestamp).Format(rfc3339Milli),
time.Duration(x.TimeoutMillis) * time.Millisecond,
})
}

switch format {
case "short", "text":
tw := out.NewTable(append(headers, "topics")...)
defer tw.Flush()
for _, x := range described.Sorted() {
// We format "foo[0;1;2],bar[1;2;3]
var ts []string
for _, t := range x.Topics.Sorted() {
var ps []string
for _, p := range t.Partitions {
ps = append(ps, strconv.Itoa(int(p)))
}
ts = append(ts, fmt.Sprintf("%s[%s]", t.Topic, strings.Join(ps, ";")))
}
topics := strings.Join(ts, ",")
tw.Print(append(common(x), topics)...)
}

case "long", "wide":
tw := out.NewTable(append(headers, "topic", "partition")...)
defer tw.Flush()
for _, x := range described.Sorted() {
for _, t := range x.Topics.Sorted() {
for _, p := range t.Partitions {
tw.Print(append(common(x), t.Topic, p)...)
}
}
}
default:
out.Die("unrecognized format %q", format)
}
},
}

cmd.Flags().StringVar(&format, "format", "text", "Output format (short, long)")
return cmd
}
131 changes: 131 additions & 0 deletions src/go/rpk/pkg/cli/cluster/txn/describe_producers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2022 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package txn

import (
"time"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/twmb/franz-go/pkg/kadm"
)

func newDescribeProducersCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var (
topics []string
partitions []int32
)
cmd := &cobra.Command{
Use: "describe-producers",
Short: "Describe transactional producers to partitions",
Long: `Describe transactional producers to partitions.
This command describes partitions that active transactional producers are
producing to. For more information on the producer ID and epoch columns, see
'rpk cluster txn --help'.
The last timestamp corresponds to the timestamp of the last record that was
written by the client. The transaction start offset corresponds to the offset
that the transaction is began at. All consumers configured to read only
committed records cannot read past the transaction start offset.
The output includes a few advanced fields that can be used for sanity checking:
the last sequence is the last sequence number that the producer has written,
and the coordinator epoch is the epoch of the broker that is being written to.
The last sequence should always go up and then wrap back to 0 at MaxInt32. The
coordinator epoch should remain fixed, or rarely, increase.
By default, all topics and partitions that have active producers are printed.
To filter for specific topics, use --topics. You can additionally filter by
partitions with --partitions.
`,

Run: func(cmd *cobra.Command, txnIDs []string) {
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "unable to load config: %v", err)

adm, err := kafka.NewAdmin(fs, p)
out.MaybeDie(err, "unable to initialize kafka client: %v", err)
defer adm.Close()

if len(topics) == 0 && len(partitions) > 0 {
out.Die("cannot specify partitions without any topics")
}
var s kadm.TopicsSet
for _, topic := range topics {
s.Add(topic, partitions...)
}

described, err := adm.DescribeProducers(cmd.Context(), s)
out.HandleShardError("DescribeProducers", err)

tw := out.NewTable(
"leader",
"topic",
"partition",
"producer-id",
"producer-epoch",
"last-timestamp",
"last-sequence",
"coordinator-epoch",
"txn-start-offset",
"error",
)
defer tw.Flush()

type fields struct {
Leader int32
Topic string
Partition int32
ProducerID int64
ProducerEpoch int16
LastTimestamp string
LastSequence int32
CoordinatorEpoch int32
TxnStartOffset int64
Err error
}

for _, d := range described.SortedPartitions() {
if d.Err != nil {
tw.PrintStructFields(fields{
Leader: d.Leader,
Topic: d.Topic,
Partition: d.Partition,
Err: err,
})
}
if len(d.ActiveProducers) == 0 {
continue
}
for _, p := range d.ActiveProducers.Sorted() {
tw.PrintStructFields(fields{
Leader: p.Leader,
Topic: p.Topic,
Partition: p.Partition,
ProducerID: p.ProducerID,
ProducerEpoch: p.ProducerEpoch,
LastSequence: p.LastSequence,
LastTimestamp: time.UnixMilli(p.LastTimestamp).Format(rfc3339Milli),
CoordinatorEpoch: p.CoordinatorEpoch,
TxnStartOffset: p.CurrentTxnStartOffset,
})
}
}
},
}

cmd.Flags().StringSliceVarP(&topics, "topics", "t", nil, "Topic to describe producers for (repeatable)")
cmd.Flags().Int32SliceVarP(&partitions, "partitions", "p", nil, "Partitions to describe producers for (repeatable)")
return cmd
}
Loading

0 comments on commit 7c9e37c

Please sign in to comment.