Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 81 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ non-Kafka-related contexts.
See [this blog post](https://segment.com/blog/easier-management-of-Kafka-topics-with-topicctl/) for
more details.

## Roadmap

We are planning on making some changes to (optionally) remove the ZK dependency and also to support
some additional security features like TLS. See
[this page](https://github.com/segmentio/topicctl/wiki/v1-Plan) for the current plan.

## Getting started

### Installation
Expand Down Expand Up @@ -69,7 +63,7 @@ topicctl apply --skip-confirm examples/local-cluster/topics/*yaml
4. Send some test messages to the `topic-default` topic:

```
topicctl tester --zk-addr=localhost:2181 --topic=topic-default
topicctl tester --broker-addr=localhost:2181 --topic=topic-default
```

5. Open up the repl (while keeping the tester running in a separate terminal):
Expand Down Expand Up @@ -200,13 +194,14 @@ only.

### Specifying the target cluster

There are two patterns for specifying a target cluster in the `topicctl` subcommands:
There are three ways to specify a target cluster in the `topicctl` subcommands:

1. `--cluster-config=[path]`, where the refererenced path is a cluster configuration
in the format expected by the `apply` command described above *or*
2. `--zk-addr=[zookeeper address]` and `--zk-prefix=[optional prefix for cluster in zookeeper]`
in the format expected by the `apply` command described above,
2. `--zk-addr=[zookeeper address]` and `--zk-prefix=[optional prefix for cluster in zookeeper]`, *or*
3. `--broker-addr=[bootstrap broker address]`

All subcommands support the `cluster-config` pattern. The second is also supported
All subcommands support the `cluster-config` pattern. The last two are also supported
by the `get`, `repl`, `reset-offsets`, and `tail` subcommands since these can be run
independently of an `apply` workflow.

Expand All @@ -222,9 +217,9 @@ typically source-controlled so that changes can be reviewed before being applied

### Clusters

Each cluster associated with a managed topic must have a config. These
configs can also be used with the `get`, `repl`, and `tail` subcommands instead
of specifying a ZooKeeper address.
Each cluster associated with a managed topic must have a config. These configs can also be used
with the `get`, `repl`, `reset-offsets`, and `tail` subcommands instead of specifying a broker or
ZooKeeper address.

The following shows an annotated example:

Expand All @@ -237,15 +232,30 @@ meta:
Test cluster for topicctl.

spec:
versionMajor: v0.10 # Version
bootstrapAddrs: # One or more broker bootstrap addresses
- my-cluster.example.com:9092
zkAddrs: # One or more cluster zookeeper addresses
- zk.example.com:2181
zkPrefix: my-cluster # Prefix for zookeeper nodes
clusterID: abc-123-xyz # Expected cluster ID for cluster (optional, used as safety check only)

# ZooKeeper access settings (required for pre-v2 clusters)
zkAddrs: # One or more cluster zookeeper addresses; if these are
- zk.example.com:2181 # omitted, then the cluster will only be accessed via broker APIs;
# see the section below on cluster access for more details.
zkPrefix: my-cluster # Prefix for zookeeper nodes if using zookeeper access
zkLockPath: /topicctl/locks # Path used for apply locks (optional)
clusterID: abc-123-xyz # Expected cluster ID for cluster (optional, used as
# safety check only)

# TLS/SSL settings (optional, not supported if using ZooKeeper)
tls:
enabled: true # Whether TLS is enabled
caCertPath: path/to/ca.crt # Path to CA cert to be used (optional)
certPath: path/to/client.crt # Path to client cert to be used (optional)
keyPath: path/to/client.key # Path to client key to be used (optional)

# SASL settings (optional, not supported if using ZooKeeper)
sasl:
enabled: true # Whether SASL is enabled
mechanism: SCRAM-SHA-512 # Mechanism to use; choices are PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512
username: my-username # Username; can also be set via TOPICCTL_SASL_USERNAME environment variable
password: my-password # Password; can also be set via TOPICCTL_SASL_PASSWORD environment variable
```

Note that the `name`, `environment`, `region`, and `description` fields are used
Expand Down Expand Up @@ -355,7 +365,7 @@ The `apply` subcommand can make changes, but under the following conditions:
7. Partition replica migrations are protected via
["throttles"](https://kafka.apache.org/0101/documentation.html#rep-throttle)
to prevent the cluster network from getting overwhelmed
8. Before applying, the tool checks the cluster ID in ZooKeeper against the expected value in the
8. Before applying, the tool checks the cluster ID against the expected value in the
cluster config. This can help prevent errors around applying in the wrong cluster when multiple
clusters are accessed through the same address, e.g `localhost:2181`.

Expand All @@ -376,30 +386,66 @@ the process should continue from where it left off.

## Cluster access details

Most `topicctl` functionality interacts with the cluster through ZooKeeper. Currently, only
the following depend on broker APIs:
### ZooKeeper vs. broker APIs

`topicctl` can interact with a cluster through either ZooKeeper or by hitting broker APIs
directly.

Broker APIs are used exclusively if the tool is run with either of the following flags:

1. `--broker-addr` *or*
2. `--cluster-config` and the cluster config doesn't specify any ZK addresses

In all other cases, i.e. if `--zk-addr` is specified or the cluster config has ZK addresses, then
ZooKeeper will be used for most interactions. A few operations that are not possible via ZK
will still use broker APIs, however, including:

1. Group-related `get` commands: `get groups`, `get lags`, `get members`
2. `get offsets`
3. `reset-offsets`
4. `tail`
5. `apply` with topic creation

In the future, we may shift more functionality away from ZooKeeper, at least for newer cluster
versions; see the "Feature roadmap" section below for more details.
### Limitations of broker-only access mode

There are a few limitations in the tool when using the broker APIs exclusively:

1. Only newer versions of Kafka are supported. In particular:
- v2.0 or greater is required for read-only operations (`get brokers`, `get topics`, etc.)
- v2.4 or greater is required for applying topic changes
2. Apply locking is not yet implemented; please be careful when applying to ensure that someone
else isn't applying changes in the same topic at the same time.
3. The values of some dynamic broker properties, e.g. `leader.replication.throttled.rate`, are not
returned by the API and thus won't appear in the tool output. This appears to be fixed in v2.6.

### TLS

TLS (referred to by the older name "SSL" in the Kafka documentation) is supported when running
`topicctl` in the exclusive broker API mode. To use this, either set `--tls-enabled` in the
command-line or, if using a cluster config, set `enabled: true` in the `TLS` section of
the latter.

In addition to standard TLS, the tool also supports mutual TLS using custom certs, keys, and CA
certs (in PEM format). As with the enabling of TLS, these can be configured either on the
command-line or in a cluster config. See [this config](examples/auth/cluster.yaml) for an example.

### SASL

`topicctl` supports SASL authentication when running in the exclusive broker API mode. To use this,
either set the `--sasl-mechanism`, `--sasl-username`, and `--sasl-password` flags on the command
line or fill out the `SASL` section of the cluster config.

If using the cluster config, the username and password can still be set on the command-line
or via the `TOPICCTL_SASL_USERNAME` and `TOPICCTL_SASL_PASSWORD` environment variables.

## Feature roadmap
The tool currently supports the following SASL mechanisms:

The following are in the medium-term roadmap:
1. `PLAIN`
2. `SCRAM-SHA-256`
3. `SCRAM-SHA-512`

1. **Use broker APIs exclusively for newer cluster versions:** This is needed for a
[future world](https://www.confluent.io/blog/removing-zookeeper-dependency-in-kafka/)
where Kafka doesn't use ZooKeeper at all. Even before that happens, though, doing everything
through broker APIs simplifies the configuration and is also needed to run `topicctl` in
environments where users aren't given direct ZK access.
2. **Support TLS for communication with cluster:** This is fairly straightforward assuming
that (1) is done. It allows `topicctl` to be run in environments that don't permit insecure
cluster access.
Note that SASL can be run either with or without TLS, although the former is generally more
secure.

## Development

Expand Down
22 changes: 12 additions & 10 deletions cmd/topicctl/subcmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ var applyCmd = &cobra.Command{
type applyCmdConfig struct {
brokersToRemove []int
brokerThrottleMBsOverride int
clusterConfig string
dryRun bool
partitionBatchSizeOverride int
pathPrefix string
Expand All @@ -37,6 +36,8 @@ type applyCmdConfig struct {
skipConfirm bool
sleepLoopDuration time.Duration

shared sharedOptions

retentionDropStepDuration time.Duration
}

Expand All @@ -55,12 +56,6 @@ func init() {
0,
"Broker throttle override (MB/sec)",
)
applyCmd.Flags().StringVar(
&applyConfig.clusterConfig,
"cluster-config",
os.Getenv("TOPICCTL_CLUSTER_CONFIG"),
"Cluster config path",
)
applyCmd.Flags().BoolVar(
&applyConfig.dryRun,
"dry-run",
Expand Down Expand Up @@ -104,6 +99,7 @@ func init() {
"Amount of time to wait between partition checks",
)

addSharedConfigOnlyFlags(applyCmd, &applyConfig.shared)
RootCmd.AddCommand(applyCmd)
}

Expand Down Expand Up @@ -191,7 +187,13 @@ func applyTopic(

adminClient, ok := adminClients[clusterConfigPath]
if !ok {
adminClient, err = clusterConfig.NewAdminClient(ctx, nil, applyConfig.dryRun)
adminClient, err = clusterConfig.NewAdminClient(
ctx,
nil,
applyConfig.dryRun,
applyConfig.shared.saslUsername,
applyConfig.shared.saslPassword,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -231,8 +233,8 @@ func applyTopic(
}

func clusterConfigForTopicApply(topicConfigPath string) (string, error) {
if applyConfig.clusterConfig != "" {
return applyConfig.clusterConfig, nil
if applyConfig.shared.clusterConfig != "" {
return applyConfig.shared.clusterConfig, nil
}

return filepath.Abs(
Expand Down
22 changes: 11 additions & 11 deletions cmd/topicctl/subcmd/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package subcmd

import (
"context"
"os"

"github.com/segmentio/topicctl/pkg/cli"
"github.com/segmentio/topicctl/pkg/config"
Expand All @@ -17,22 +16,17 @@ var bootstrapCmd = &cobra.Command{
}

type bootstrapCmdConfig struct {
clusterConfig string
matchRegexp string
excludeRegexp string
outputDir string
overwrite bool

shared sharedOptions
}

var bootstrapConfig bootstrapCmdConfig

func init() {
bootstrapCmd.Flags().StringVar(
&bootstrapConfig.clusterConfig,
"cluster-config",
os.Getenv("TOPICCTL_CLUSTER_CONFIG"),
"Cluster config",
)
bootstrapCmd.Flags().StringVar(
&bootstrapConfig.matchRegexp,
"match",
Expand All @@ -59,20 +53,26 @@ func init() {
"Overwrite existing configs in output directory",
)

addSharedConfigOnlyFlags(bootstrapCmd, &bootstrapConfig.shared)
bootstrapCmd.MarkFlagRequired("cluster-config")

RootCmd.AddCommand(bootstrapCmd)
}

func bootstrapRun(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

clusterConfig, err := config.LoadClusterFile(bootstrapConfig.clusterConfig)
clusterConfig, err := config.LoadClusterFile(bootstrapConfig.shared.clusterConfig)
if err != nil {
return err
}
adminClient, err := clusterConfig.NewAdminClient(ctx, nil, true)
adminClient, err := clusterConfig.NewAdminClient(
ctx,
nil,
true,
bootstrapConfig.shared.saslUsername,
bootstrapConfig.shared.saslPassword,
)
if err != nil {
return err
}
Expand Down
28 changes: 15 additions & 13 deletions cmd/topicctl/subcmd/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,16 @@ var checkCmd = &cobra.Command{
}

type checkCmdConfig struct {
clusterConfig string
checkLeaders bool
pathPrefix string
validateOnly bool
checkLeaders bool
pathPrefix string
validateOnly bool

shared sharedOptions
}

var checkConfig checkCmdConfig

func init() {
checkCmd.Flags().StringVar(
&checkConfig.clusterConfig,
"cluster-config",
os.Getenv("TOPICCTL_CLUSTER_CONFIG"),
"Cluster config",
)
checkCmd.Flags().StringVar(
&checkConfig.pathPrefix,
"path-prefix",
Expand All @@ -55,6 +50,7 @@ func init() {
"Validate configs only, without connecting to cluster",
)

addSharedConfigOnlyFlags(checkCmd, &checkConfig.shared)
RootCmd.AddCommand(checkCmd)
}

Expand Down Expand Up @@ -136,7 +132,13 @@ func checkTopicFile(
var ok bool
adminClient, ok = adminClients[clusterConfigPath]
if !ok {
adminClient, err = clusterConfig.NewAdminClient(ctx, nil, true)
adminClient, err = clusterConfig.NewAdminClient(
ctx,
nil,
true,
checkConfig.shared.saslUsername,
checkConfig.shared.saslPassword,
)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -177,8 +179,8 @@ func checkTopicFile(
}

func clusterConfigForTopicCheck(topicConfigPath string) (string, error) {
if checkConfig.clusterConfig != "" {
return checkConfig.clusterConfig, nil
if checkConfig.shared.clusterConfig != "" {
return checkConfig.shared.clusterConfig, nil
}

return filepath.Abs(
Expand Down
Loading