diff --git a/README.md b/README.md index 8275ccc9..a9006dba 100644 --- a/README.md +++ b/README.md @@ -23,12 +23,6 @@ non-Kafka-related contexts. See [this blog post](https://segment.com/blog/easier-management-of-Kafka-topics-with-topicctl/) for more details. -## Roadmap - -We are planning on making some changes to (optionally) remove the ZK dependency and also to support -some additional security features like TLS. See -[this page](https://github.com/segmentio/topicctl/wiki/v1-Plan) for the current plan. - ## Getting started ### Installation @@ -69,7 +63,7 @@ topicctl apply --skip-confirm examples/local-cluster/topics/*yaml 4. Send some test messages to the `topic-default` topic: ``` -topicctl tester --zk-addr=localhost:2181 --topic=topic-default +topicctl tester --broker-addr=localhost:2181 --topic=topic-default ``` 5. Open up the repl (while keeping the tester running in a separate terminal): @@ -200,13 +194,14 @@ only. ### Specifying the target cluster -There are two patterns for specifying a target cluster in the `topicctl` subcommands: +There are three ways to specify a target cluster in the `topicctl` subcommands: 1. `--cluster-config=[path]`, where the refererenced path is a cluster configuration - in the format expected by the `apply` command described above *or* -2. `--zk-addr=[zookeeper address]` and `--zk-prefix=[optional prefix for cluster in zookeeper]` + in the format expected by the `apply` command described above, +2. `--zk-addr=[zookeeper address]` and `--zk-prefix=[optional prefix for cluster in zookeeper]`, *or* +3. `--broker-addr=[bootstrap broker address]` -All subcommands support the `cluster-config` pattern. The second is also supported +All subcommands support the `cluster-config` pattern. The last two are also supported by the `get`, `repl`, `reset-offsets`, and `tail` subcommands since these can be run independently of an `apply` workflow. @@ -222,9 +217,9 @@ typically source-controlled so that changes can be reviewed before being applied ### Clusters -Each cluster associated with a managed topic must have a config. These -configs can also be used with the `get`, `repl`, and `tail` subcommands instead -of specifying a ZooKeeper address. +Each cluster associated with a managed topic must have a config. These configs can also be used +with the `get`, `repl`, `reset-offsets`, and `tail` subcommands instead of specifying a broker or +ZooKeeper address. The following shows an annotated example: @@ -237,15 +232,30 @@ meta: Test cluster for topicctl. spec: - versionMajor: v0.10 # Version bootstrapAddrs: # One or more broker bootstrap addresses - my-cluster.example.com:9092 - zkAddrs: # One or more cluster zookeeper addresses - - zk.example.com:2181 - zkPrefix: my-cluster # Prefix for zookeeper nodes + clusterID: abc-123-xyz # Expected cluster ID for cluster (optional, used as safety check only) + + # ZooKeeper access settings (required for pre-v2 clusters) + zkAddrs: # One or more cluster zookeeper addresses; if these are + - zk.example.com:2181 # omitted, then the cluster will only be accessed via broker APIs; + # see the section below on cluster access for more details. + zkPrefix: my-cluster # Prefix for zookeeper nodes if using zookeeper access zkLockPath: /topicctl/locks # Path used for apply locks (optional) - clusterID: abc-123-xyz # Expected cluster ID for cluster (optional, used as - # safety check only) + + # TLS/SSL settings (optional, not supported if using ZooKeeper) + tls: + enabled: true # Whether TLS is enabled + caCertPath: path/to/ca.crt # Path to CA cert to be used (optional) + certPath: path/to/client.crt # Path to client cert to be used (optional) + keyPath: path/to/client.key # Path to client key to be used (optional) + + # SASL settings (optional, not supported if using ZooKeeper) + sasl: + enabled: true # Whether SASL is enabled + mechanism: SCRAM-SHA-512 # Mechanism to use; choices are PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512 + username: my-username # Username; can also be set via TOPICCTL_SASL_USERNAME environment variable + password: my-password # Password; can also be set via TOPICCTL_SASL_PASSWORD environment variable ``` Note that the `name`, `environment`, `region`, and `description` fields are used @@ -355,7 +365,7 @@ The `apply` subcommand can make changes, but under the following conditions: 7. Partition replica migrations are protected via ["throttles"](https://kafka.apache.org/0101/documentation.html#rep-throttle) to prevent the cluster network from getting overwhelmed -8. Before applying, the tool checks the cluster ID in ZooKeeper against the expected value in the +8. Before applying, the tool checks the cluster ID against the expected value in the cluster config. This can help prevent errors around applying in the wrong cluster when multiple clusters are accessed through the same address, e.g `localhost:2181`. @@ -376,8 +386,19 @@ the process should continue from where it left off. ## Cluster access details -Most `topicctl` functionality interacts with the cluster through ZooKeeper. Currently, only -the following depend on broker APIs: +### ZooKeeper vs. broker APIs + +`topicctl` can interact with a cluster through either ZooKeeper or by hitting broker APIs +directly. + +Broker APIs are used exclusively if the tool is run with either of the following flags: + +1. `--broker-addr` *or* +2. `--cluster-config` and the cluster config doesn't specify any ZK addresses + +In all other cases, i.e. if `--zk-addr` is specified or the cluster config has ZK addresses, then +ZooKeeper will be used for most interactions. A few operations that are not possible via ZK +will still use broker APIs, however, including: 1. Group-related `get` commands: `get groups`, `get lags`, `get members` 2. `get offsets` @@ -385,21 +406,46 @@ the following depend on broker APIs: 4. `tail` 5. `apply` with topic creation -In the future, we may shift more functionality away from ZooKeeper, at least for newer cluster -versions; see the "Feature roadmap" section below for more details. +### Limitations of broker-only access mode + +There are a few limitations in the tool when using the broker APIs exclusively: + +1. Only newer versions of Kafka are supported. In particular: + - v2.0 or greater is required for read-only operations (`get brokers`, `get topics`, etc.) + - v2.4 or greater is required for applying topic changes +2. Apply locking is not yet implemented; please be careful when applying to ensure that someone + else isn't applying changes in the same topic at the same time. +3. The values of some dynamic broker properties, e.g. `leader.replication.throttled.rate`, are not + returned by the API and thus won't appear in the tool output. This appears to be fixed in v2.6. + +### TLS + +TLS (referred to by the older name "SSL" in the Kafka documentation) is supported when running +`topicctl` in the exclusive broker API mode. To use this, either set `--tls-enabled` in the +command-line or, if using a cluster config, set `enabled: true` in the `TLS` section of +the latter. + +In addition to standard TLS, the tool also supports mutual TLS using custom certs, keys, and CA +certs (in PEM format). As with the enabling of TLS, these can be configured either on the +command-line or in a cluster config. See [this config](examples/auth/cluster.yaml) for an example. + +### SASL + +`topicctl` supports SASL authentication when running in the exclusive broker API mode. To use this, +either set the `--sasl-mechanism`, `--sasl-username`, and `--sasl-password` flags on the command +line or fill out the `SASL` section of the cluster config. + +If using the cluster config, the username and password can still be set on the command-line +or via the `TOPICCTL_SASL_USERNAME` and `TOPICCTL_SASL_PASSWORD` environment variables. -## Feature roadmap +The tool currently supports the following SASL mechanisms: -The following are in the medium-term roadmap: +1. `PLAIN` +2. `SCRAM-SHA-256` +3. `SCRAM-SHA-512` -1. **Use broker APIs exclusively for newer cluster versions:** This is needed for a - [future world](https://www.confluent.io/blog/removing-zookeeper-dependency-in-kafka/) - where Kafka doesn't use ZooKeeper at all. Even before that happens, though, doing everything - through broker APIs simplifies the configuration and is also needed to run `topicctl` in - environments where users aren't given direct ZK access. -2. **Support TLS for communication with cluster:** This is fairly straightforward assuming - that (1) is done. It allows `topicctl` to be run in environments that don't permit insecure - cluster access. +Note that SASL can be run either with or without TLS, although the former is generally more +secure. ## Development diff --git a/cmd/topicctl/subcmd/apply.go b/cmd/topicctl/subcmd/apply.go index b04fb2e0..cfba8749 100644 --- a/cmd/topicctl/subcmd/apply.go +++ b/cmd/topicctl/subcmd/apply.go @@ -28,7 +28,6 @@ var applyCmd = &cobra.Command{ type applyCmdConfig struct { brokersToRemove []int brokerThrottleMBsOverride int - clusterConfig string dryRun bool partitionBatchSizeOverride int pathPrefix string @@ -37,6 +36,8 @@ type applyCmdConfig struct { skipConfirm bool sleepLoopDuration time.Duration + shared sharedOptions + retentionDropStepDuration time.Duration } @@ -55,12 +56,6 @@ func init() { 0, "Broker throttle override (MB/sec)", ) - applyCmd.Flags().StringVar( - &applyConfig.clusterConfig, - "cluster-config", - os.Getenv("TOPICCTL_CLUSTER_CONFIG"), - "Cluster config path", - ) applyCmd.Flags().BoolVar( &applyConfig.dryRun, "dry-run", @@ -104,6 +99,7 @@ func init() { "Amount of time to wait between partition checks", ) + addSharedConfigOnlyFlags(applyCmd, &applyConfig.shared) RootCmd.AddCommand(applyCmd) } @@ -191,7 +187,13 @@ func applyTopic( adminClient, ok := adminClients[clusterConfigPath] if !ok { - adminClient, err = clusterConfig.NewAdminClient(ctx, nil, applyConfig.dryRun) + adminClient, err = clusterConfig.NewAdminClient( + ctx, + nil, + applyConfig.dryRun, + applyConfig.shared.saslUsername, + applyConfig.shared.saslPassword, + ) if err != nil { return err } @@ -231,8 +233,8 @@ func applyTopic( } func clusterConfigForTopicApply(topicConfigPath string) (string, error) { - if applyConfig.clusterConfig != "" { - return applyConfig.clusterConfig, nil + if applyConfig.shared.clusterConfig != "" { + return applyConfig.shared.clusterConfig, nil } return filepath.Abs( diff --git a/cmd/topicctl/subcmd/bootstrap.go b/cmd/topicctl/subcmd/bootstrap.go index cc0fe14a..6110d5ae 100644 --- a/cmd/topicctl/subcmd/bootstrap.go +++ b/cmd/topicctl/subcmd/bootstrap.go @@ -2,7 +2,6 @@ package subcmd import ( "context" - "os" "github.com/segmentio/topicctl/pkg/cli" "github.com/segmentio/topicctl/pkg/config" @@ -17,22 +16,17 @@ var bootstrapCmd = &cobra.Command{ } type bootstrapCmdConfig struct { - clusterConfig string matchRegexp string excludeRegexp string outputDir string overwrite bool + + shared sharedOptions } var bootstrapConfig bootstrapCmdConfig func init() { - bootstrapCmd.Flags().StringVar( - &bootstrapConfig.clusterConfig, - "cluster-config", - os.Getenv("TOPICCTL_CLUSTER_CONFIG"), - "Cluster config", - ) bootstrapCmd.Flags().StringVar( &bootstrapConfig.matchRegexp, "match", @@ -59,8 +53,8 @@ func init() { "Overwrite existing configs in output directory", ) + addSharedConfigOnlyFlags(bootstrapCmd, &bootstrapConfig.shared) bootstrapCmd.MarkFlagRequired("cluster-config") - RootCmd.AddCommand(bootstrapCmd) } @@ -68,11 +62,17 @@ func bootstrapRun(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - clusterConfig, err := config.LoadClusterFile(bootstrapConfig.clusterConfig) + clusterConfig, err := config.LoadClusterFile(bootstrapConfig.shared.clusterConfig) if err != nil { return err } - adminClient, err := clusterConfig.NewAdminClient(ctx, nil, true) + adminClient, err := clusterConfig.NewAdminClient( + ctx, + nil, + true, + bootstrapConfig.shared.saslUsername, + bootstrapConfig.shared.saslPassword, + ) if err != nil { return err } diff --git a/cmd/topicctl/subcmd/check.go b/cmd/topicctl/subcmd/check.go index 48de5c3e..33f2aaf1 100644 --- a/cmd/topicctl/subcmd/check.go +++ b/cmd/topicctl/subcmd/check.go @@ -21,21 +21,16 @@ var checkCmd = &cobra.Command{ } type checkCmdConfig struct { - clusterConfig string - checkLeaders bool - pathPrefix string - validateOnly bool + checkLeaders bool + pathPrefix string + validateOnly bool + + shared sharedOptions } var checkConfig checkCmdConfig func init() { - checkCmd.Flags().StringVar( - &checkConfig.clusterConfig, - "cluster-config", - os.Getenv("TOPICCTL_CLUSTER_CONFIG"), - "Cluster config", - ) checkCmd.Flags().StringVar( &checkConfig.pathPrefix, "path-prefix", @@ -55,6 +50,7 @@ func init() { "Validate configs only, without connecting to cluster", ) + addSharedConfigOnlyFlags(checkCmd, &checkConfig.shared) RootCmd.AddCommand(checkCmd) } @@ -136,7 +132,13 @@ func checkTopicFile( var ok bool adminClient, ok = adminClients[clusterConfigPath] if !ok { - adminClient, err = clusterConfig.NewAdminClient(ctx, nil, true) + adminClient, err = clusterConfig.NewAdminClient( + ctx, + nil, + true, + checkConfig.shared.saslUsername, + checkConfig.shared.saslPassword, + ) if err != nil { return false, err } @@ -177,8 +179,8 @@ func checkTopicFile( } func clusterConfigForTopicCheck(topicConfigPath string) (string, error) { - if checkConfig.clusterConfig != "" { - return checkConfig.clusterConfig, nil + if checkConfig.shared.clusterConfig != "" { + return checkConfig.shared.clusterConfig, nil } return filepath.Abs( diff --git a/cmd/topicctl/subcmd/get.go b/cmd/topicctl/subcmd/get.go index cd901de5..bd2d7ab3 100644 --- a/cmd/topicctl/subcmd/get.go +++ b/cmd/topicctl/subcmd/get.go @@ -2,15 +2,11 @@ package subcmd import ( "context" - "errors" "fmt" - "os" "strings" "github.com/aws/aws-sdk-go/aws/session" - "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/cli" - "github.com/segmentio/topicctl/pkg/config" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -33,100 +29,36 @@ var getCmd = &cobra.Command{ } type getCmdConfig struct { - brokerAddr string - clusterConfig string - full bool - zkAddr string - zkPrefix string + full bool + + shared sharedOptions } var getConfig getCmdConfig func init() { - getCmd.Flags().StringVar( - &getConfig.brokerAddr, - "broker-addr", - "", - "Broker address", - ) - getCmd.Flags().StringVar( - &getConfig.clusterConfig, - "cluster-config", - os.Getenv("TOPICCTL_CLUSTER_CONFIG"), - "Cluster config", - ) getCmd.Flags().BoolVar( &getConfig.full, "full", false, "Show more full information for resources", ) - getCmd.Flags().StringVarP( - &getConfig.zkAddr, - "zk-addr", - "z", - "", - "ZooKeeper address", - ) - getCmd.Flags().StringVar( - &getConfig.zkPrefix, - "zk-prefix", - "", - "Prefix for cluster-related nodes in zk", - ) + addSharedFlags(getCmd, &getConfig.shared) RootCmd.AddCommand(getCmd) } func getPreRun(cmd *cobra.Command, args []string) error { - if getConfig.clusterConfig == "" && getConfig.zkAddr == "" && getConfig.brokerAddr == "" { - return errors.New("Must set either broker-addr, cluster-config, zk address") - } - if getConfig.clusterConfig != "" && - (getConfig.zkAddr != "" || getConfig.zkPrefix != "" || getConfig.brokerAddr != "") { - log.Warn("broker and zk flags are ignored when using cluster-config") - } - - return nil + return getConfig.shared.validate() } func getRun(cmd *cobra.Command, args []string) error { ctx := context.Background() sess := session.Must(session.NewSession()) - var adminClient admin.Client - var clientErr error - - if getConfig.clusterConfig != "" { - clusterConfig, err := config.LoadClusterFile(getConfig.clusterConfig) - if err != nil { - return err - } - adminClient, clientErr = clusterConfig.NewAdminClient(ctx, sess, true) - } else if getConfig.brokerAddr != "" { - adminClient, clientErr = admin.NewBrokerAdminClient( - ctx, - admin.BrokerAdminClientConfig{ - BrokerAddr: getConfig.brokerAddr, - ReadOnly: true, - }, - ) - } else { - adminClient, clientErr = admin.NewZKAdminClient( - ctx, - admin.ZKAdminClientConfig{ - ZKAddrs: []string{getConfig.zkAddr}, - ZKPrefix: getConfig.zkPrefix, - Sess: sess, - // Run in read-only mode to ensure that tailing doesn't make any changes - // in the cluster - ReadOnly: true, - }, - ) - } - - if clientErr != nil { - return clientErr + adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true) + if err != nil { + return err } defer adminClient.Close() diff --git a/cmd/topicctl/subcmd/repl.go b/cmd/topicctl/subcmd/repl.go index e8b47c03..c3f8aaf5 100644 --- a/cmd/topicctl/subcmd/repl.go +++ b/cmd/topicctl/subcmd/repl.go @@ -2,14 +2,9 @@ package subcmd import ( "context" - "errors" - "os" "github.com/aws/aws-sdk-go/aws/session" - "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/cli" - "github.com/segmentio/topicctl/pkg/config" - log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -21,94 +16,27 @@ var replCmd = &cobra.Command{ } type replCmdConfig struct { - brokerAddr string - clusterConfig string - zkAddr string - zkPrefix string + shared sharedOptions } var replConfig replCmdConfig func init() { - replCmd.Flags().StringVar( - &replConfig.brokerAddr, - "broker-addr", - "", - "Broker address", - ) - replCmd.Flags().StringVar( - &replConfig.clusterConfig, - "cluster-config", - os.Getenv("TOPICCTL_CLUSTER_CONFIG"), - "Cluster config", - ) - replCmd.Flags().StringVarP( - &replConfig.zkAddr, - "zk-addr", - "z", - "", - "ZooKeeper address", - ) - replCmd.Flags().StringVar( - &replConfig.zkPrefix, - "zk-prefix", - "", - "Prefix for cluster-related nodes in zk", - ) - + addSharedFlags(replCmd, &replConfig.shared) RootCmd.AddCommand(replCmd) } func replPreRun(cmd *cobra.Command, args []string) error { - if replConfig.clusterConfig == "" && replConfig.zkAddr == "" && - replConfig.brokerAddr == "" { - return errors.New("Must set either broker-addr, cluster-config, or zk-addr") - } - if replConfig.clusterConfig != "" && - (replConfig.zkAddr != "" || replConfig.zkPrefix != "" || replConfig.brokerAddr != "") { - log.Warn("broker and zk flags are ignored when using cluster-config") - } - - return nil + return replConfig.shared.validate() } func replRun(cmd *cobra.Command, args []string) error { ctx := context.Background() sess := session.Must(session.NewSession()) - var adminClient admin.Client - var clientErr error - - if replConfig.clusterConfig != "" { - clusterConfig, err := config.LoadClusterFile(replConfig.clusterConfig) - if err != nil { - return err - } - adminClient, clientErr = clusterConfig.NewAdminClient(ctx, sess, true) - } else if replConfig.brokerAddr != "" { - adminClient, clientErr = admin.NewBrokerAdminClient( - ctx, - admin.BrokerAdminClientConfig{ - BrokerAddr: replConfig.brokerAddr, - ReadOnly: true, - }, - ) - } else { - adminClient, clientErr = admin.NewZKAdminClient( - ctx, - admin.ZKAdminClientConfig{ - ZKAddrs: []string{replConfig.zkAddr}, - ZKPrefix: replConfig.zkPrefix, - Sess: sess, - // Run in read-only mode to ensure that tailing doesn't make any changes - // in the cluster - ReadOnly: true, - }, - ) - } - - if clientErr != nil { - return clientErr + adminClient, err := replConfig.shared.getAdminClient(ctx, sess, true) + if err != nil { + return err } defer adminClient.Close() diff --git a/cmd/topicctl/subcmd/reset.go b/cmd/topicctl/subcmd/reset.go index aa697d56..39cc9c87 100644 --- a/cmd/topicctl/subcmd/reset.go +++ b/cmd/topicctl/subcmd/reset.go @@ -4,12 +4,9 @@ import ( "context" "errors" "fmt" - "os" - "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/apply" "github.com/segmentio/topicctl/pkg/cli" - "github.com/segmentio/topicctl/pkg/config" "github.com/segmentio/topicctl/pkg/groups" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -24,29 +21,15 @@ var resetOffsetsCmd = &cobra.Command{ } type resetOffsetsCmdConfig struct { - brokerAddr string - clusterConfig string - offset int64 - partitions []int - zkAddr string - zkPrefix string + offset int64 + partitions []int + + shared sharedOptions } var resetOffsetsConfig resetOffsetsCmdConfig func init() { - resetOffsetsCmd.Flags().StringVar( - &resetOffsetsConfig.brokerAddr, - "broker-addr", - "", - "Broker address", - ) - resetOffsetsCmd.Flags().StringVar( - &resetOffsetsConfig.clusterConfig, - "cluster-config", - os.Getenv("TOPICCTL_CLUSTER_CONFIG"), - "Cluster config", - ) resetOffsetsCmd.Flags().Int64Var( &resetOffsetsConfig.offset, "offset", @@ -59,71 +42,22 @@ func init() { []int{}, "Partition (defaults to all)", ) - resetOffsetsCmd.Flags().StringVarP( - &resetOffsetsConfig.zkAddr, - "zk-addr", - "z", - "", - "ZooKeeper address", - ) - resetOffsetsCmd.Flags().StringVar( - &resetOffsetsConfig.zkPrefix, - "zk-prefix", - "", - "Prefix for cluster-related nodes in zk", - ) + addSharedFlags(resetOffsetsCmd, &resetOffsetsConfig.shared) RootCmd.AddCommand(resetOffsetsCmd) } func resetOffsetsPreRun(cmd *cobra.Command, args []string) error { - if resetOffsetsConfig.clusterConfig == "" && resetOffsetsConfig.zkAddr == "" && - resetOffsetsConfig.brokerAddr == "" { - return errors.New("Must set either broker-addr, cluster-config, or zk-addr") - } - if resetOffsetsConfig.clusterConfig != "" && - (resetOffsetsConfig.zkAddr != "" || resetOffsetsConfig.zkPrefix != "" || - resetOffsetsConfig.brokerAddr != "") { - log.Warn("broker and zk flags are ignored when using cluster-config") - } - - return nil + return resetOffsetsConfig.shared.validate() } func resetOffsetsRun(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var adminClient admin.Client - var clientErr error - - if resetOffsetsConfig.clusterConfig != "" { - clusterConfig, err := config.LoadClusterFile(resetOffsetsConfig.clusterConfig) - if err != nil { - return err - } - adminClient, clientErr = clusterConfig.NewAdminClient(ctx, nil, false) - } else if resetOffsetsConfig.brokerAddr != "" { - adminClient, clientErr = admin.NewBrokerAdminClient( - ctx, - admin.BrokerAdminClientConfig{ - BrokerAddr: resetOffsetsConfig.brokerAddr, - ReadOnly: true, - }, - ) - } else { - adminClient, clientErr = admin.NewZKAdminClient( - ctx, - admin.ZKAdminClientConfig{ - ZKAddrs: []string{resetOffsetsConfig.zkAddr}, - ZKPrefix: resetOffsetsConfig.zkPrefix, - ReadOnly: false, - }, - ) - } - - if clientErr != nil { - return clientErr + adminClient, err := replConfig.shared.getAdminClient(ctx, nil, true) + if err != nil { + return err } defer adminClient.Close() diff --git a/cmd/topicctl/subcmd/shared.go b/cmd/topicctl/subcmd/shared.go new file mode 100644 index 00000000..004c7456 --- /dev/null +++ b/cmd/topicctl/subcmd/shared.go @@ -0,0 +1,238 @@ +package subcmd + +import ( + "context" + "errors" + "os" + + "github.com/aws/aws-sdk-go/aws/session" + "github.com/hashicorp/go-multierror" + "github.com/segmentio/topicctl/pkg/admin" + "github.com/segmentio/topicctl/pkg/config" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +type sharedOptions struct { + brokerAddr string + clusterConfig string + saslMechanism string + saslPassword string + saslUsername string + tlsCACert string + tlsCert string + tlsEnabled bool + tlsKey string + tlsSkipVerify bool + tlsServerName string + zkAddr string + zkPrefix string +} + +func (s sharedOptions) validate() error { + var err error + + if s.clusterConfig == "" && s.zkAddr == "" && s.brokerAddr == "" { + err = multierror.Append( + err, + errors.New("Must set either broker-addr, cluster-config, or zk-addr"), + ) + } + + if s.zkAddr != "" && s.brokerAddr != "" { + err = multierror.Append( + err, + errors.New("Cannot set both zk-addr and broker-addr"), + ) + } + if s.clusterConfig != "" && + (s.zkAddr != "" || s.zkPrefix != "" || s.brokerAddr != "" || s.tlsCACert != "" || + s.tlsCert != "" || s.tlsKey != "" || s.saslMechanism != "") { + log.Warn("Broker and zk flags are ignored when using cluster-config") + } + + useTLS := s.tlsEnabled || s.tlsCACert != "" || s.tlsCert != "" || s.tlsKey != "" + useSASL := s.saslMechanism != "" || s.saslPassword != "" || s.saslUsername != "" + + if useTLS && s.zkAddr != "" { + log.Warn("TLS flags are ignored accessing cluster via zookeeper") + } + if useSASL && s.zkAddr != "" { + log.Warn("SASL flags are ignored accessing cluster via zookeeper") + } + + if useSASL { + if saslErr := admin.ValidateSASLMechanism(s.saslMechanism); saslErr != nil { + err = multierror.Append(err, saslErr) + } + } + + return err +} + +func (s sharedOptions) getAdminClient( + ctx context.Context, + sess *session.Session, + readOnly bool, +) (admin.Client, error) { + if s.clusterConfig != "" { + clusterConfig, err := config.LoadClusterFile(s.clusterConfig) + if err != nil { + return nil, err + } + return clusterConfig.NewAdminClient( + ctx, + sess, + true, + s.saslUsername, + s.saslPassword, + ) + } else if s.brokerAddr != "" { + tlsEnabled := (s.tlsEnabled || + s.tlsCACert != "" || + s.tlsCert != "" || + s.tlsKey != "") + saslEnabled := (s.saslMechanism != "" || + s.saslPassword != "" || + s.saslUsername != "") + return admin.NewBrokerAdminClient( + ctx, + admin.BrokerAdminClientConfig{ + ConnectorConfig: admin.ConnectorConfig{ + BrokerAddr: s.brokerAddr, + TLS: admin.TLSConfig{ + Enabled: tlsEnabled, + CACertPath: s.tlsCACert, + CertPath: s.tlsCert, + KeyPath: s.tlsKey, + ServerName: s.tlsServerName, + SkipVerify: s.tlsSkipVerify, + }, + SASL: admin.SASLConfig{ + Enabled: saslEnabled, + Mechanism: s.saslMechanism, + Password: s.saslPassword, + Username: s.saslUsername, + }, + }, + ReadOnly: readOnly, + }, + ) + } else { + return admin.NewZKAdminClient( + ctx, + admin.ZKAdminClientConfig{ + ZKAddrs: []string{s.zkAddr}, + ZKPrefix: s.zkPrefix, + Sess: sess, + // Run in read-only mode to ensure that tailing doesn't make any changes + // in the cluster + ReadOnly: readOnly, + }, + ) + } +} + +func addSharedFlags(cmd *cobra.Command, options *sharedOptions) { + cmd.Flags().StringVarP( + &options.brokerAddr, + "broker-addr", + "b", + "", + "Broker address", + ) + cmd.Flags().StringVar( + &options.clusterConfig, + "cluster-config", + os.Getenv("TOPICCTL_CLUSTER_CONFIG"), + "Cluster config", + ) + cmd.Flags().StringVar( + &options.saslMechanism, + "sasl-mechanism", + "", + "SASL mechanism if using SASL (choices: PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512)", + ) + cmd.Flags().StringVar( + &options.saslPassword, + "sasl-password", + os.Getenv("TOPICCTL_SASL_PASSWORD"), + "SASL password if using SASL; will override value set in cluster config", + ) + cmd.Flags().StringVar( + &options.saslUsername, + "sasl-username", + os.Getenv("TOPICCTL_SASL_USERNAME"), + "SASL username if using SASL; will override value set in cluster config", + ) + cmd.Flags().StringVar( + &options.tlsCACert, + "tls-ca-cert", + "", + "Path to client CA cert PEM file if using TLS", + ) + cmd.Flags().StringVar( + &options.tlsCert, + "tls-cert", + "", + "Path to client cert PEM file if using TLS", + ) + cmd.Flags().BoolVar( + &options.tlsEnabled, + "tls-enabled", + false, + "Use TLS for communication with brokers", + ) + cmd.Flags().StringVar( + &options.tlsKey, + "tls-key", + "", + "Path to client private key PEM file if using TLS", + ) + cmd.Flags().StringVar( + &options.tlsServerName, + "tls-server-name", + "", + "Server name to use for TLS cert verification", + ) + cmd.Flags().BoolVar( + &options.tlsSkipVerify, + "tls-skip-verify", + false, + "Skip hostname verification when using TLS", + ) + cmd.Flags().StringVarP( + &options.zkAddr, + "zk-addr", + "z", + "", + "ZooKeeper address", + ) + cmd.Flags().StringVar( + &options.zkPrefix, + "zk-prefix", + "", + "Prefix for cluster-related nodes in zk", + ) +} + +func addSharedConfigOnlyFlags(cmd *cobra.Command, options *sharedOptions) { + cmd.Flags().StringVar( + &options.clusterConfig, + "cluster-config", + os.Getenv("TOPICCTL_CLUSTER_CONFIG"), + "Cluster config", + ) + cmd.Flags().StringVar( + &options.saslPassword, + "sasl-password", + os.Getenv("TOPICCTL_SASL_PASSWORD"), + "SASL password if using SASL; will override value set in cluster config", + ) + cmd.Flags().StringVar( + &options.saslUsername, + "sasl-username", + os.Getenv("TOPICCTL_SASL_USERNAME"), + "SASL username if using SASL; will override value set in cluster config", + ) +} diff --git a/cmd/topicctl/subcmd/tail.go b/cmd/topicctl/subcmd/tail.go index 88481d88..40a906c9 100644 --- a/cmd/topicctl/subcmd/tail.go +++ b/cmd/topicctl/subcmd/tail.go @@ -2,16 +2,13 @@ package subcmd import ( "context" - "errors" "os" "os/signal" "strconv" "syscall" "github.com/segmentio/kafka-go" - "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/cli" - "github.com/segmentio/topicctl/pkg/config" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -25,30 +22,16 @@ var tailCmd = &cobra.Command{ } type tailCmdConfig struct { - brokerAddr string - clusterConfig string - offset int64 - partitions []int - raw bool - zkAddr string - zkPrefix string + offset int64 + partitions []int + raw bool + + shared sharedOptions } var tailConfig tailCmdConfig func init() { - tailCmd.Flags().StringVar( - &tailConfig.brokerAddr, - "broker-addr", - "", - "Broker address", - ) - tailCmd.Flags().StringVar( - &tailConfig.clusterConfig, - "cluster-config", - os.Getenv("TOPICCTL_CLUSTER_CONFIG"), - "Cluster config", - ) tailCmd.Flags().Int64Var( &tailConfig.offset, "offset", @@ -67,20 +50,8 @@ func init() { false, "Output raw values only", ) - tailCmd.Flags().StringVarP( - &tailConfig.zkAddr, - "zk-addr", - "z", - "", - "ZooKeeper address", - ) - tailCmd.Flags().StringVar( - &tailConfig.zkPrefix, - "zk-prefix", - "", - "Prefix for cluster-related nodes in zk", - ) + addSharedFlags(tailCmd, &tailConfig.shared) RootCmd.AddCommand(tailCmd) } @@ -89,18 +60,7 @@ func tailPreRun(cmd *cobra.Command, args []string) error { // In raw mode, only log out errors log.SetLevel(log.ErrorLevel) } - - if tailConfig.clusterConfig == "" && tailConfig.zkAddr == "" && - tailConfig.brokerAddr == "" { - return errors.New("Must set either broker-addr, cluster-config, or zk-addr") - } - if tailConfig.clusterConfig != "" && - (tailConfig.zkAddr != "" || tailConfig.zkPrefix != "" || - tailConfig.brokerAddr != "") { - log.Warn("broker and zk flags are ignored when using cluster-config") - } - - return nil + return tailConfig.shared.validate() } func tailRun(cmd *cobra.Command, args []string) error { @@ -114,38 +74,9 @@ func tailRun(cmd *cobra.Command, args []string) error { cancel() }() - var adminClient admin.Client - var clientErr error - - if tailConfig.clusterConfig != "" { - clusterConfig, err := config.LoadClusterFile(tailConfig.clusterConfig) - if err != nil { - return err - } - adminClient, clientErr = clusterConfig.NewAdminClient(ctx, nil, true) - } else if tailConfig.brokerAddr != "" { - adminClient, clientErr = admin.NewBrokerAdminClient( - ctx, - admin.BrokerAdminClientConfig{ - BrokerAddr: tailConfig.brokerAddr, - ReadOnly: true, - }, - ) - } else { - adminClient, clientErr = admin.NewZKAdminClient( - ctx, - admin.ZKAdminClientConfig{ - ZKAddrs: []string{tailConfig.zkAddr}, - ZKPrefix: tailConfig.zkPrefix, - // Run in read-only mode to ensure that tailing doesn't make any changes - // in the cluster - ReadOnly: true, - }, - ) - } - - if clientErr != nil { - return clientErr + adminClient, err := tailConfig.shared.getAdminClient(ctx, nil, true) + if err != nil { + return err } defer adminClient.Close() diff --git a/cmd/topicctl/subcmd/tester.go b/cmd/topicctl/subcmd/tester.go index 43c7f82c..6e760a9f 100644 --- a/cmd/topicctl/subcmd/tester.go +++ b/cmd/topicctl/subcmd/tester.go @@ -10,7 +10,6 @@ import ( "time" "github.com/segmentio/kafka-go" - "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/apply" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -24,23 +23,17 @@ var testerCmd = &cobra.Command{ } type testerCmdConfig struct { - brokerAddr string mode string readConsumer string topic string writeRate int - zkAddr string + + shared sharedOptions } var testerConfig testerCmdConfig func init() { - testerCmd.Flags().StringVar( - &testerConfig.brokerAddr, - "broker-addr", - "", - "Broker address", - ) testerCmd.Flags().StringVar( &testerConfig.mode, "mode", @@ -65,23 +58,14 @@ func init() { 5, "Approximate number of messages to write per sec", ) - testerCmd.Flags().StringVar( - &testerConfig.zkAddr, - "zk-addr", - "localhost:2181", - "Zookeeper address", - ) testerCmd.MarkFlagRequired("topic") - + addSharedFlags(testerCmd, &testerConfig.shared) RootCmd.AddCommand(testerCmd) } func testerPreRun(cmd *cobra.Command, args []string) error { - if testerConfig.zkAddr == "" && tailConfig.brokerAddr == "" { - return errors.New("Must set either broker-addr or zk-addr") - } - return nil + return testerConfig.shared.validate() } func testerRun(cmd *cobra.Command, args []string) error { @@ -106,15 +90,17 @@ func testerRun(cmd *cobra.Command, args []string) error { } func runTestReader(ctx context.Context) error { - brokerAddr, err := getBrokerAddr(ctx) + adminClient, err := testerConfig.shared.getAdminClient(ctx, nil, true) if err != nil { return err } + defer adminClient.Close() + connector := adminClient.GetConnector() log.Infof( "This will read test messages from the '%s' topic in %s using the consumer group ID '%s'", testerConfig.topic, - brokerAddr, + connector.Config.BrokerAddr, testerConfig.readConsumer, ) @@ -125,8 +111,9 @@ func runTestReader(ctx context.Context) error { reader := kafka.NewReader( kafka.ReaderConfig{ - Brokers: []string{brokerAddr}, + Brokers: []string{connector.Config.BrokerAddr}, GroupID: testerConfig.readConsumer, + Dialer: connector.Dialer, Topic: testerConfig.topic, MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB @@ -152,15 +139,17 @@ func runTestReader(ctx context.Context) error { } func runTestWriter(ctx context.Context) error { - brokerAddr, err := getBrokerAddr(ctx) + adminClient, err := testerConfig.shared.getAdminClient(ctx, nil, true) if err != nil { return err } + defer adminClient.Close() + connector := adminClient.GetConnector() log.Infof( "This will write test messages to the '%s' topic in %s at a rate of %d/sec.", testerConfig.topic, - brokerAddr, + connector.Config.BrokerAddr, testerConfig.writeRate, ) @@ -171,7 +160,8 @@ func runTestWriter(ctx context.Context) error { writer := kafka.NewWriter( kafka.WriterConfig{ - Brokers: []string{brokerAddr}, + Brokers: []string{connector.Config.BrokerAddr}, + Dialer: connector.Dialer, Topic: testerConfig.topic, Balancer: &kafka.LeastBytes{}, Async: true, @@ -209,20 +199,3 @@ func runTestWriter(ctx context.Context) error { } } } - -func getBrokerAddr(ctx context.Context) (string, error) { - if testerConfig.brokerAddr == "" { - adminClient, err := admin.NewZKAdminClient( - ctx, - admin.ZKAdminClientConfig{ - ZKAddrs: []string{testerConfig.zkAddr}, - }, - ) - if err != nil { - return "", err - } - return adminClient.GetBootstrapAddrs()[0], nil - } else { - return testerConfig.brokerAddr, nil - } -} diff --git a/docker-compose-auth.yml b/docker-compose-auth.yml new file mode 100644 index 00000000..16b70690 --- /dev/null +++ b/docker-compose-auth.yml @@ -0,0 +1,50 @@ +# This config sets up a simple, single-node cluster that's equipped to use SSL/TLS and/or SASL. +# It exposes access on four separate ports: +# +# 1. 9092: plaintext, no SASL +# 2. 9093: SSL, no SASL +# 3. 9094: SASL over plaintext +# 4. 9095: SASL over SSL +# +# See examples/auth for the associated cluster configs and certs. +version: '2' + +services: + zookeeper: + image: "wurstmeister/zookeeper:latest" + ports: + - "2181:2181" + + kafka: + image: wurstmeister/kafka:2.12-2.4.1 + restart: on-failure:3 + links: + - zookeeper + ports: + - 9092:9092 + - 9093:9093 + - 9094:9094 + - 9095:9095 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9092 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_MESSAGE_MAX_BYTES: 200000000 + KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093,SASL_PLAINTEXT://:9094,SASL_SSL://:9095" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_PLAINTEXT://localhost:9094,SASL_SSL://localhost:9095" + KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN,SCRAM-SHA-256,SCRAM-SHA-512" + KAFKA_SSL_KEYSTORE_LOCATION: /certs/kafka.keystore.jks + KAFKA_SSL_KEYSTORE_PASSWORD: test123 + KAFKA_SSL_KEY_PASSWORD: test123 + KAFKA_SSL_TRUSTSTORE_LOCATION: /certs/kafka.truststore.jks + KAFKA_SSL_TRUSTSTORE_PASSWORD: test123 + KAFKA_SSL_CLIENT_AUTH: none + KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "" + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" + CUSTOM_INIT_SCRIPT: |- + echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; + /opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - ./examples/auth/certs:/certs diff --git a/docker-compose.yml b/docker-compose.yml index 0db41fc2..99a5e5c6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,9 +1,11 @@ -# By default, this docker-compose setup uses Kafka 0.10.2. This version can +# By default, this docker-compose setup uses Kafka 2.4.1. This version can # be overwritten by setting the KAFKA_IMAGE_TAG environment variable; some choices here include: # # 1. Kafka 2.6: 2.13-2.6.0 # 2. Kafka 2.5: 2.13-2.5.0 -# 3. Kafka 2.4: 2.12-2.4.1 +# 3. Kafka 0.10: 2.11-0.10.2.2 +# +# See https://hub.docker.com/r/wurstmeister/kafka/tags for the complete list. version: '2.1' services: zookeeper: @@ -13,7 +15,7 @@ services: # Zone 1 brokers kafka1: - image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.11-0.10.2.2} + image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.12-2.4.1} ports: - "9092:9092" environment: @@ -27,7 +29,7 @@ services: - zookeeper kafka2: - image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.11-0.10.2.2} + image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.12-2.4.1} ports: - "9093:9092" environment: @@ -43,7 +45,7 @@ services: # Zone 2 brokers kafka3: - image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.11-0.10.2.2} + image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.12-2.4.1} ports: - "9094:9092" environment: @@ -59,7 +61,7 @@ services: - kafka2 kafka4: - image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.11-0.10.2.2} + image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.12-2.4.1} ports: - "9095:9092" environment: @@ -77,7 +79,7 @@ services: # Zone 3 brokers kafka5: - image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.11-0.10.2.2} + image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.12-2.4.1} ports: - "9096:9092" environment: @@ -95,7 +97,7 @@ services: - kafka4 kafka6: - image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.11-0.10.2.2} + image: wurstmeister/kafka:${KAFKA_IMAGE_TAG:-2.12-2.4.1} ports: - "9097:9092" environment: diff --git a/examples/auth/certs/ca.crt b/examples/auth/certs/ca.crt new file mode 100644 index 00000000..563c9e0d --- /dev/null +++ b/examples/auth/certs/ca.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDKDCCAhACCQCxcVKD5FwhXjANBgkqhkiG9w0BAQsFADBWMQswCQYDVQQGEwJV +UzELMAkGA1UECAwCQ0ExCzAJBgNVBAcMAlNGMRAwDgYDVQQKDAdTZWdtZW50MRsw +GQYDVQQDDBJ1c2VyLnNlZ21lbnQubG9jYWwwHhcNMjAxMTI0MDI1ODIyWhcNMzAx +MTIyMDI1ODIyWjBWMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExCzAJBgNVBAcM +AlNGMRAwDgYDVQQKDAdTZWdtZW50MRswGQYDVQQDDBJ1c2VyLnNlZ21lbnQubG9j +YWwwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC4ayiwKN/iS06RJwOL +Bey75INaq92gsPT8yOI2/u43hqO7wiC6AHnf1nais/4P1zUuS6WZeA5rUsJPzhKC +N6fYFNkEMA5ui7LoEjJqD6o4Bw1cxWvQ/+Y9GwDOdK6T/q/ZSu9W7TB/Lgi0dT3C +SNfr/KnBDwsSUjEV7WP84qfbikUInPx7doFTm/pa6J44LvvdLH3qBqdiWdjKP/K3 +9mzADbNAQeLReGBEouaVBdIccDVoNfcG1/f+DDcupCjzFMUM8Hu5991a7Z7f1A1T +FalZmbk+THiP/4lliQgKfvhfVryVmq8YVsPPEiyy9biF/qtrJKFMbxsNn2eqP9ED +ZRCnAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAHnHNiOQ02HfJXcLVAAJFMrfe2a8 +cfoD58oB0xuXEjHAHqR4iwp01S8fssNw82RdIxymPD8nfOkyRnd5g+gYgTOJsTL9 +ed3UGIH8dSel367z5dxYL3fVGZttqUutDH60TtUVc+qS1W+Vt6pEexETDo5PYj/D +Ho4f3YoORg+w4V0ZUCX3mgNYxyJULuPLBeYn4bXnplCcxG9rZgxMtIOhMnLlYdtn +nJbiruRmReJk+Ifkx//38YkYFjxWuztdWmF3nVotyUNCAV0cb5E/0w8nLwrR8JZr +ychNKuBcy92RJ7XYFMytI4PqisXiQ/VP5R0tpjVxLMAonF7hWj7n+5LSM8I= +-----END CERTIFICATE----- diff --git a/examples/auth/certs/ca.key b/examples/auth/certs/ca.key new file mode 100644 index 00000000..93b51fb1 --- /dev/null +++ b/examples/auth/certs/ca.key @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFHzBJBgkqhkiG9w0BBQ0wPDAbBgkqhkiG9w0BBQwwDgQIDqzKTLtzeZcCAggA +MB0GCWCGSAFlAwQBKgQQkYvJZrDaFnldNi2Wr53f9wSCBND9mXn1ii3nsPieqy07 +7Gl3iZ0N18jRJfqsbAcbj31Vktcp+dhMLFbz51iFBTqq7xESUFrZEaGSpwqGrLV7 +NrvkDgq2khqQGh1dlCbYYyBfq2vyxgpeISxrHCNUtfsBpT6x/r8lLjxrjD2m/PjR +J80ZSfV1KpaSEP4HPR01mC3EwF+wdwK3NYJWkk0PcKKZVhXVFg6v3x6RJhnU7jW1 +aKnrT/GWdrhSqTZBvrrW4+x0wr/Iygw/MxuO70k4fpYWfrcLIEs9Ia5RVpFS1lXz ++DRBLZnjsyv7BXSeDRBEWcx1pb5CGuwgQ4yRx2xK2lRReX6lIE53YLD/1nK8xSzS +5JorKefkwsHhqgfRrzZu6aLMe7f7PJmrfQ9L4dLc27MC7HIrBnA2RZLfHkp9hGOP +73H5U6s6OhvFC81ECh3+ejHVk+Zb6Rx6Xi/tOPoPNLNiEXaLP9jpCo1uLqnvRdpG +PyjpuP0goOb6u/1HU7Nfxqyxf4LVPb0DTwnooXPveJr9czhuboYjhCk48cheTYy1 +kiCym2aKQv62ZxiCVVcHuoCXTbRZKAUtfYG3CQBn3a2Y0I2r1gODeMokpNHuSA9L +htuOuXx5SYKWxSeGsT0FePddHOBWRTOw3OB9fj95/vHVs7Tkiw6Uj9Sr/Q7rtm2O +KuLBw9QvZKSzpJlZ3nKaylWSXJp6zOx32UsrK+XQhL3UCbS2BkZxA5s1VVIg9p4+ +c0gqflIDvoBHM4elLZ/X2AV0lsdfNvyPx+wmI4nW5sazPbeIXY/MzOv7KwSeZkea +phGufA0ioEQUlyJ+P5iDOsaD1uGH7rdyIOGWnaP6jiGAkQ6IoorLSMvzdCHysuZp +ZziOfLL8ydhxpkma4VK1PYHFNlk72T/qeHDRr110osvMiE2DuMQ9RPbLR4hrrEDM +WUx4fgnpCqDDx2/1YScXioMY4BwEHliS7hZlv27ODWRO23YdQSB/e//yh9VBQnGS +dFw1dmtu1XDe+78l/YbuSzE3ppYO6VAMRiFgrWUubZgS5qmzAHn0i68ERnlK7j9s +jQYGltuHetrMnW7aBsV+TU3QsOqWrF9hL1geydwKezXGYkEURY69zLrlVkNxhFtQ +VJNRo9YKiFrdE/94qrhjyNZOwmq/3vs2+XEUGhCCp+Pfxtg8jJNSuEoXKWaaTwtS +fOae6wtbeydQG6iVYz7Mdm/Sc2xKRm3aVz4mkCxcfdyKmP8W0Yiw+6GniNR1Ryfu +gyr2LN44MaPH5fazYVPapocm1xK4HDVAHjFLzfzWbYde8fzf21LWWad5pwsClimL +4GfkFp0KGfajlcDMquJWn8ILU41unXo3Q/ikV3w8HEr6KydqRYXDctcm6WAx/8NP +8YXP9NEcq8FCKqjGlenjSwo1YfUJnmDEglpX8gCdYzEe0n6aEq2TfY8AsZ/rzls6 +BxRcnR7U3zR2maDr4gAYo7WR2pGe9WbgT+arwtBValmDI5C2sAu83MB6ubGA/lDR +M+DhLy69s4QeKI/X35bUVY3Qjm09Zxj4NOgVZwWV6mEAyMMLyKRF+NPnBdMSxQRe +u6Jpt4Ig1RUXMRNSv3u86W/PFmS7gAQEBMYL2rVrSeF8Vlb/uVPC31fUFDmFfQ3x +nPWzjMvCjivfzVGp5vo/Sd6OLw== +-----END ENCRYPTED PRIVATE KEY----- diff --git a/examples/auth/certs/client.crt b/examples/auth/certs/client.crt new file mode 100644 index 00000000..e6b1dace --- /dev/null +++ b/examples/auth/certs/client.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDNzCCAh8CCQCBQTsa+CfoGTANBgkqhkiG9w0BAQUFADBWMQswCQYDVQQGEwJV +UzELMAkGA1UECAwCQ0ExCzAJBgNVBAcMAlNGMRAwDgYDVQQKDAdTZWdtZW50MRsw +GQYDVQQDDBJ1c2VyLnNlZ21lbnQubG9jYWwwHhcNMjAxMTI0MDI1OTQxWhcNMzAx +MTIyMDI1OTQxWjBlMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExCzAJBgNVBAcT +AlNGMRAwDgYDVQQKEwdTZWdtZW50MRAwDgYDVQQLEwdTZWdtZW50MRgwFgYDVQQD +Ew9CZW5qYW1pbiBZb2xrZW4wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB +AQCdQR9sA4Rv7sPzBGO5FMTSq7ULxvCEIONE2zppv7DqOCmz6Lq2//7tzCeSNt7p +vh5HH9tpwIN5b8kwoppMKGOVPTmfdbTXXwsSP3JPZJnPSFdoElBh/qnhoKhF6tLD +um3fZFbZ66KintcG5/9PgluanOr4WuakW3YBs9SLwEY28ZvkeBcwfSZENFh74dr6 +alkfn+u+0zhyRVvbT6A0gr2zeRUb35UAS5Vlgc5zI65v+d7TT7OaIHkL8rnrsMft +DqBlPhTzlfGz64ipFgFQuTHuFWg5jenwKuLaDq2tMawRLaM0ZZCAu7Tmlmq429VD +CP1wEq1DKUWQ7lR1me9Cft+3AgMBAAEwDQYJKoZIhvcNAQEFBQADggEBAIntZGmd +1tSxOfgqGz3vQSqoYhC9tsEWbwGnez7c2WLJE3vR6pHB06iWqegf78DAWkz1ZI+d +1OPYfqPw63sOZYsHIoug5xR8QtU8G3NaT1H7Vc7GiIkU+PIn0V7DzBSqEZWuoTz3 +XUbH25O5ynMKyGR6zirRZLDL1lw1dDKeqbaUPt/QxuY1S6Pl+36C2DOBDWqJJWQJ +rtxJ4zFA+ZJEK1EKIJF7ufM0qfCCnKTnvo/4SLGItPrmp/xUFl5T4ises/uLqrY4 +fWbpV8hbuPo5f50AD6S1Iw7H1ZVKyHazBIYaC0QS8Vk4vp5I6J7OnuR+jhv0y0vr +Bd/jWLKcAv0MzjQ= +-----END CERTIFICATE----- diff --git a/examples/auth/certs/client.key b/examples/auth/certs/client.key new file mode 100644 index 00000000..d788fc63 --- /dev/null +++ b/examples/auth/certs/client.key @@ -0,0 +1,29 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCdQR9sA4Rv7sPz +BGO5FMTSq7ULxvCEIONE2zppv7DqOCmz6Lq2//7tzCeSNt7pvh5HH9tpwIN5b8kw +oppMKGOVPTmfdbTXXwsSP3JPZJnPSFdoElBh/qnhoKhF6tLDum3fZFbZ66KintcG +5/9PgluanOr4WuakW3YBs9SLwEY28ZvkeBcwfSZENFh74dr6alkfn+u+0zhyRVvb +T6A0gr2zeRUb35UAS5Vlgc5zI65v+d7TT7OaIHkL8rnrsMftDqBlPhTzlfGz64ip +FgFQuTHuFWg5jenwKuLaDq2tMawRLaM0ZZCAu7Tmlmq429VDCP1wEq1DKUWQ7lR1 +me9Cft+3AgMBAAECggEADqdR4UPWpIOQWOXw0P9hc+wyO723DejupKz1HYOSXdEL ++crXE1R5kfkzOsnILenccm5CiPE6jydejRyp2iztUqvY4cYbKvKdWn71DPbn6kvo +cTc7rFYJyI+q/pDqQPjvYiC8gyQVDKhWizs1LFiOZrL2plv6IBixv2jdhoRNRrNI +65N6Z9wHTNztcJyKDKT6Z4pgBDYSWzUIAGVfw2vhYwDavgiSTAOksGc15rmamc0+ +ZYaDpwfbFEI06jBIJEBzcfdp/0FtNQSTL0FDkXdQyBhc6IxNHIifL5dDvhMo0Aks +dcnlBf13DVskLe7lVxOPuXoljC6Us1WkWJ2VWJ6RYQKBgQDyvxqInnqVS2AK8t+X +q7w4yccv+2exPM5gs9Bht9C/e/OPb39bSAdjLOJOsTAkkBQfTyrawEqUGFsq5y1B +U4SljPJchXfL82pefXfIHCtqhv8ieJb2Tby50PQzeY2+pw8/0xgbVjTAr4JKRXO0 +hyMEOj6ElKFnjHlpqSuCl5hGsQKBgQCl1xX1uBimphJJ3Zo74UUop87GeFbWl4oL +pwqI3DOy5KMrbRtWqU3EI8BkjgjHJLF3d4evVAIoBG54fZFjHSPgq3163WUuU6ZW +p1gvHTWyTNbuu8IFqqhbE/vHUZMfX7ksuBooFOsY0Wqq0poVgDWriFS5SfYek43f +WAZiAvn25wKBgFAd3qYEmDS6AeLbMgye86pSfllJwnlutjaYYkg+ILlyMXq/s+ru +pPGImNCcDmWi3+FNgbldCcBDIaPRVNBgvkDdeggrTNSVbB/vjR8QnQu1rnM0Fa8J +DSbO3io3Dh9Eh/Xqt+Qd2Z9WzcuxjHSivV3h00xyuaqxZEkJOoEJg4qhAoGADOND +JJ5S5BiB0VW0V7Tw7/Dig8/0R6btJmyrx+j854kXGRfYiQqNLZHtsKLNEdTLKdKT +K8/mfv+hKiHv+3jXQe1xyeuMomYDxjYpBzhI5PtNtK3IrTIO9Uz/QwUW3thMhqoj +9jtx7bLQjEfji4o0IYlttByIUOX8n3+yt0kt7b8CgYEA0NMkFD39NBYv4Ec5fx7y +bNcObdI35cyw5xt2uSqC/2fcqLeD8FKlsS9b9JkkhYvEEe1z/3vMuO7erpQuh5j6 +P9gO/TcaZOk9DACoI2hEfUlW4SX2rLPbPB0ZG61PStjeyf1uy78jCVQYjM7N+WBE +IY2Y+bXLOjn24ooplcIlZsw= +-----END PRIVATE KEY----- + diff --git a/examples/auth/certs/kafka.keystore.jks b/examples/auth/certs/kafka.keystore.jks new file mode 100644 index 00000000..c293b952 Binary files /dev/null and b/examples/auth/certs/kafka.keystore.jks differ diff --git a/examples/auth/certs/kafka.truststore.jks b/examples/auth/certs/kafka.truststore.jks new file mode 100644 index 00000000..432c94af Binary files /dev/null and b/examples/auth/certs/kafka.truststore.jks differ diff --git a/examples/auth/cluster.yaml b/examples/auth/cluster.yaml new file mode 100644 index 00000000..07b097c9 --- /dev/null +++ b/examples/auth/cluster.yaml @@ -0,0 +1,29 @@ +meta: + name: local-cluster-auth + environment: local-env + region: local-region + description: | + Test cluster that uses SSL/TLS and SASL to securely connect to brokers. Can be run + against compose setup defined in docker-compose-auth.yml in the repo root. + +spec: + bootstrapAddrs: + # To use just TLS without SASL, switch to port 9093 and disable SASL in the config below. + # To use just SASL without TLS, switch to port 9094 and disabled TLS in the config below. + - localhost:9095 + tls: + enabled: true + caCertPath: certs/ca.crt + certPath: certs/client.crt + keyPath: certs/client.key + skipVerify: true + sasl: + enabled: true + mechanism: SCRAM-SHA-512 + + # As an alternative to storing these in the config (probably not super-secure), these can be + # set by using the --sasl-username and --sasl-password flags or the + # TOPICCTL_SASL_USERNAME and TOPICCTL_SASL_PASSWORD environment variables when running + # topicctl. + username: adminscram + password: admin-secret-512 diff --git a/examples/auth/topics/topic-default.yaml b/examples/auth/topics/topic-default.yaml new file mode 100644 index 00000000..d2555bd8 --- /dev/null +++ b/examples/auth/topics/topic-default.yaml @@ -0,0 +1,17 @@ +meta: + name: topic-default + cluster: local-cluster-auth + environment: local-env + region: local-region + description: | + Topic that uses default (any) strategy for assigning partition brokers. + +spec: + partitions: 3 + replicationFactor: 1 + retentionMinutes: 100 + placement: + strategy: any + settings: + cleanup.policy: delete + max.message.bytes: 5542880 diff --git a/examples/local-cluster/cluster.yaml b/examples/local-cluster/cluster.yaml index d6d78d6e..ca142900 100644 --- a/examples/local-cluster/cluster.yaml +++ b/examples/local-cluster/cluster.yaml @@ -3,13 +3,16 @@ meta: environment: local-env region: local-region description: | - Test cluster + Test cluster that uses plaintext access to brokers. Can be run against compose setup defined + in docker-compose.yml in the repo root. spec: - versionMajor: v2 bootstrapAddrs: - localhost:9092 - zkAddrs: - - localhost:2181 - zkLockPath: /topicctl/locks - useBrokerAdmin: true + + # Uncomment these lines to access cluster via ZooKeeper instead of broker APIs (required + # for older cluster versions). + # + # zkAddrs: + # - localhost:2181 + # zkLockPath: /topicctl/locks diff --git a/go.mod b/go.mod index 43a93604..786a24ac 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( // This is a draft kafka-go version that is not merged into master of that // repo yet. - github.com/segmentio/kafka-go v0.4.9-0.20201120053500-98b10c7c5631 + github.com/segmentio/kafka-go v0.4.9-0.20201125211318-63239f1766cc github.com/sirupsen/logrus v1.2.0 github.com/spf13/cobra v1.0.0 diff --git a/go.sum b/go.sum index 77435739..8a630808 100644 --- a/go.sum +++ b/go.sum @@ -153,6 +153,8 @@ github.com/segmentio/kafka-go v0.4.9-0.20201119185034-ed175e9082b6 h1:kkk2CI7oly github.com/segmentio/kafka-go v0.4.9-0.20201119185034-ed175e9082b6/go.mod h1:Inh7PqOsxmfgasV8InZYKVXWsdjcCq2d9tFV75GLbuM= github.com/segmentio/kafka-go v0.4.9-0.20201120053500-98b10c7c5631 h1:BaADAfG3xFHYQ9CZ4kZITBSosvg9Wz0PLhxQFFDQvTI= github.com/segmentio/kafka-go v0.4.9-0.20201120053500-98b10c7c5631/go.mod h1:Inh7PqOsxmfgasV8InZYKVXWsdjcCq2d9tFV75GLbuM= +github.com/segmentio/kafka-go v0.4.9-0.20201125211318-63239f1766cc h1:hwPom0mtHZTB+96qMCt8QxjhDHZZ15+4eGuM0DdCKUY= +github.com/segmentio/kafka-go v0.4.9-0.20201125211318-63239f1766cc/go.mod h1:Inh7PqOsxmfgasV8InZYKVXWsdjcCq2d9tFV75GLbuM= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -221,6 +223,7 @@ golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= diff --git a/pkg/admin/brokerclient.go b/pkg/admin/brokerclient.go index 081ffbd1..f41266f4 100644 --- a/pkg/admin/brokerclient.go +++ b/pkg/admin/brokerclient.go @@ -19,33 +19,36 @@ const ( // BrokerAdminClient is a Client implementation that only uses broker APIs, without any // zookeeper access. type BrokerAdminClient struct { - brokerAddr string client *kafka.Client - readOnly bool + connector *Connector + config BrokerAdminClientConfig supportedFeatures SupportedFeatures } var _ Client = (*BrokerAdminClient)(nil) type BrokerAdminClientConfig struct { - BrokerAddr string - ReadOnly bool + ConnectorConfig + ReadOnly bool } func NewBrokerAdminClient( ctx context.Context, config BrokerAdminClientConfig, ) (*BrokerAdminClient, error) { - client := &kafka.Client{ - Addr: kafka.TCP(config.BrokerAddr), + connector, err := NewConnector(config.ConnectorConfig) + if err != nil { + return nil, err } + client := connector.KafkaClient + log.Debugf("Getting supported API versions") apiVersions, err := client.ApiVersions(ctx, kafka.ApiVersionsRequest{}) if err != nil { return nil, err } log.Debugf("Supported API versions: %+v", apiVersions) - maxVersions := map[string]int16{} + maxVersions := map[string]int{} for _, apiKey := range apiVersions.ApiKeys { maxVersions[apiKey.ApiName] = apiKey.MaxVersion } @@ -80,9 +83,9 @@ func NewBrokerAdminClient( log.Debugf("Supported features: %+v", supportedFeatures) return &BrokerAdminClient{ - brokerAddr: config.BrokerAddr, client: client, - readOnly: config.ReadOnly, + connector: connector, + config: config, supportedFeatures: supportedFeatures, }, nil } @@ -164,8 +167,8 @@ func (c *BrokerAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error) { return brokerIDs, nil } -func (c *BrokerAdminClient) GetBootstrapAddrs() []string { - return []string{c.brokerAddr} +func (c *BrokerAdminClient) GetConnector() *Connector { + return c.connector } func (c *BrokerAdminClient) GetTopics( @@ -283,7 +286,7 @@ func (c *BrokerAdminClient) UpdateTopicConfig( configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error) { - if c.readOnly { + if c.config.ReadOnly { return nil, errors.New("Cannot update topic config read-only mode") } @@ -314,7 +317,7 @@ func (c *BrokerAdminClient) UpdateBrokerConfig( configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error) { - if c.readOnly { + if c.config.ReadOnly { return nil, errors.New("Cannot update broker config read-only mode") } @@ -343,7 +346,7 @@ func (c *BrokerAdminClient) CreateTopic( ctx context.Context, config kafka.TopicConfig, ) error { - if c.readOnly { + if c.config.ReadOnly { return errors.New("Cannot create topic in read-only mode") } @@ -362,21 +365,15 @@ func (c *BrokerAdminClient) AssignPartitions( topic string, assignments []PartitionAssignment, ) error { - if c.readOnly { + if c.config.ReadOnly { return errors.New("Cannot assign partitions in read-only mode") } apiAssignments := []kafka.AlterPartitionReassignmentsRequestAssignment{} for _, assignment := range assignments { - replicas := []int32{} - - for _, replica := range assignment.Replicas { - replicas = append(replicas, int32(replica)) - } - apiAssignment := kafka.AlterPartitionReassignmentsRequestAssignment{ - PartitionID: int32(assignment.ID), - BrokerIDs: replicas, + PartitionID: assignment.ID, + BrokerIDs: assignment.Replicas, } apiAssignments = append(apiAssignments, apiAssignment) } @@ -398,7 +395,7 @@ func (c *BrokerAdminClient) AddPartitions( topic string, newAssignments []PartitionAssignment, ) error { - if c.readOnly { + if c.config.ReadOnly { return errors.New("Cannot add partitions in read-only mode") } @@ -409,15 +406,10 @@ func (c *BrokerAdminClient) AddPartitions( partitions := []kafka.CreatePartitionsRequestPartition{} for _, newAssignment := range newAssignments { - replicas := []int32{} - for _, replica := range newAssignment.Replicas { - replicas = append(replicas, int32(replica)) - } - partitions = append( partitions, kafka.CreatePartitionsRequestPartition{ - BrokerIDs: replicas, + BrokerIDs: newAssignment.Replicas, }, ) } @@ -425,7 +417,7 @@ func (c *BrokerAdminClient) AddPartitions( req := kafka.CreatePartitionsRequest{ Topic: topic, NewPartitions: partitions, - TotalCount: int32(len(partitions) + len(topicInfo.Partitions)), + TotalCount: len(partitions) + len(topicInfo.Partitions), Timeout: defaultTimeout, } log.Debugf("CreatePartitions request: %+v", req) @@ -441,18 +433,13 @@ func (c *BrokerAdminClient) RunLeaderElection( topic string, partitions []int, ) error { - if c.readOnly { + if c.config.ReadOnly { return errors.New("Cannot run leader election in read-only mode") } - partitionsInt32 := []int32{} - for _, partition := range partitions { - partitionsInt32 = append(partitionsInt32, int32(partition)) - } - req := kafka.ElectLeadersRequest{ Topic: topic, - Partitions: partitionsInt32, + Partitions: partitions, Timeout: defaultTimeout, } log.Debugf("ElectLeaders request: %+v", req) diff --git a/pkg/admin/brokerclient_test.go b/pkg/admin/brokerclient_test.go index 0933ad9c..3c5de41c 100644 --- a/pkg/admin/brokerclient_test.go +++ b/pkg/admin/brokerclient_test.go @@ -20,7 +20,11 @@ func TestBrokerClientGetClusterID(t *testing.T) { ctx := context.Background() client, err := NewBrokerAdminClient( ctx, - BrokerAdminClientConfig{BrokerAddr: util.TestKafkaAddr()}, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + }, ) require.NoError(t, err) @@ -37,7 +41,11 @@ func TestBrokerClientUpdateTopicConfig(t *testing.T) { ctx := context.Background() client, err := NewBrokerAdminClient( ctx, - BrokerAdminClientConfig{BrokerAddr: util.TestKafkaAddr()}, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + }, ) require.NoError(t, err) @@ -139,7 +147,11 @@ func TestBrokerClientBrokers(t *testing.T) { ctx := context.Background() client, err := NewBrokerAdminClient( ctx, - BrokerAdminClientConfig{BrokerAddr: util.TestKafkaAddr()}, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + }, ) require.NoError(t, err) @@ -258,7 +270,11 @@ func TestBrokerClientAddPartitions(t *testing.T) { ctx := context.Background() client, err := NewBrokerAdminClient( ctx, - BrokerAdminClientConfig{BrokerAddr: util.TestKafkaAddr()}, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + }, ) require.NoError(t, err) @@ -322,7 +338,11 @@ func TestBrokerClientAlterAssignments(t *testing.T) { ctx := context.Background() client, err := NewBrokerAdminClient( ctx, - BrokerAdminClientConfig{BrokerAddr: util.TestKafkaAddr()}, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + }, ) require.NoError(t, err) @@ -410,7 +430,11 @@ func TestBrokerClientRunLeaderElection(t *testing.T) { ctx := context.Background() client, err := NewBrokerAdminClient( ctx, - BrokerAdminClientConfig{BrokerAddr: util.TestKafkaAddr()}, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + }, ) require.NoError(t, err) @@ -447,7 +471,11 @@ func TestBrokerClientGetApiVersions(t *testing.T) { ctx := context.Background() client, err := NewBrokerAdminClient( ctx, - BrokerAdminClientConfig{BrokerAddr: util.TestKafkaAddr()}, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + }, ) require.NoError(t, err) diff --git a/pkg/admin/client.go b/pkg/admin/client.go index e01bffb4..e6f884db 100644 --- a/pkg/admin/client.go +++ b/pkg/admin/client.go @@ -12,7 +12,7 @@ type Client interface { GetClusterID(ctx context.Context) (string, error) GetBrokers(ctx context.Context, ids []int) ([]BrokerInfo, error) GetBrokerIDs(ctx context.Context) ([]int, error) - GetBootstrapAddrs() []string + GetConnector() *Connector GetTopics( ctx context.Context, names []string, diff --git a/pkg/admin/connector.go b/pkg/admin/connector.go new file mode 100644 index 00000000..a316656b --- /dev/null +++ b/pkg/admin/connector.go @@ -0,0 +1,159 @@ +package admin + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "strings" + "time" + + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl" + "github.com/segmentio/kafka-go/sasl/plain" + "github.com/segmentio/kafka-go/sasl/scram" + log "github.com/sirupsen/logrus" +) + +type ConnectorConfig struct { + BrokerAddr string + TLS TLSConfig + SASL SASLConfig +} + +type TLSConfig struct { + Enabled bool + CertPath string + KeyPath string + CACertPath string + ServerName string + SkipVerify bool +} + +type SASLConfig struct { + Enabled bool + Mechanism string + Username string + Password string +} + +type Connector struct { + Config ConnectorConfig + Dialer *kafka.Dialer + KafkaClient *kafka.Client +} + +func NewConnector(config ConnectorConfig) (*Connector, error) { + connector := &Connector{ + Config: config, + } + + var saslMechanism sasl.Mechanism + var tlsConfig *tls.Config + var err error + + if config.SASL.Enabled { + switch strings.ToLower(config.SASL.Mechanism) { + case "plain": + saslMechanism = plain.Mechanism{ + Username: config.SASL.Username, + Password: config.SASL.Password, + } + case "scram-sha-256": + saslMechanism, err = scram.Mechanism( + scram.SHA256, + config.SASL.Username, + config.SASL.Password, + ) + if err != nil { + return nil, err + } + case "scram-sha-512": + saslMechanism, err = scram.Mechanism( + scram.SHA512, + config.SASL.Username, + config.SASL.Password, + ) + if err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("Unrecognized SASL mechanism: %s", config.SASL.Mechanism) + } + } + + if !config.TLS.Enabled { + connector.Dialer = kafka.DefaultDialer + } else { + var certs []tls.Certificate + var caCertPool *x509.CertPool + + if config.TLS.CertPath != "" && config.TLS.KeyPath != "" { + log.Debugf( + "Loading key pair from %s and %s", + config.TLS.CertPath, + config.TLS.KeyPath, + ) + cert, err := tls.LoadX509KeyPair(config.TLS.CertPath, config.TLS.KeyPath) + if err != nil { + return nil, err + } + certs = append(certs, cert) + } + + if config.TLS.CACertPath != "" { + log.Debugf("Adding CA certs from %s", config.TLS.CACertPath) + caCertPool = x509.NewCertPool() + caCertContents, err := ioutil.ReadFile(config.TLS.CACertPath) + if err != nil { + return nil, err + } + if ok := caCertPool.AppendCertsFromPEM(caCertContents); !ok { + return nil, fmt.Errorf( + "Could not append CA certs from %s", + config.TLS.CACertPath, + ) + } + } + + tlsConfig = &tls.Config{ + Certificates: certs, + RootCAs: caCertPool, + InsecureSkipVerify: config.TLS.SkipVerify, + ServerName: config.TLS.ServerName, + } + connector.Dialer = &kafka.Dialer{ + SASLMechanism: saslMechanism, + Timeout: 10 * time.Second, + TLS: tlsConfig, + } + } + + log.Debugf("Connecting to cluster on address %s with TLS enabled=%v, SASL enabled=%v", + config.BrokerAddr, + config.TLS.Enabled, + config.SASL.Enabled, + ) + connector.KafkaClient = &kafka.Client{ + Addr: kafka.TCP(config.BrokerAddr), + Transport: &kafka.Transport{ + Dial: connector.Dialer.DialFunc, + SASL: saslMechanism, + TLS: tlsConfig, + }, + } + + return connector, nil +} + +func ValidateSASLMechanism(mechanism string) error { + switch strings.ToLower(mechanism) { + case "plain", "scram-sha-256", "scram-sha-512": + return nil + default: + return fmt.Errorf( + "SASL mechanism '%s' is not valid; choices are PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512", + mechanism, + ) + } +} diff --git a/pkg/admin/zkclient.go b/pkg/admin/zkclient.go index 7eb26335..20fce7ad 100644 --- a/pkg/admin/zkclient.go +++ b/pkg/admin/zkclient.go @@ -47,6 +47,7 @@ type ZKAdminClient struct { zkClient zk.Client zkPrefix string bootstrapAddrs []string + Connector *Connector sess *session.Session readOnly bool } @@ -132,6 +133,11 @@ func NewZKAdminClient( } client.bootstrapAddrs = bootstrapAddrs + client.Connector, err = NewConnector( + ConnectorConfig{ + BrokerAddr: bootstrapAddrs[0], + }, + ) return client, nil } @@ -287,10 +293,8 @@ func (c *ZKAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error) { return brokerIDs, nil } -// GetBootstrapAddrs returns the stored value of the bootstrapAddrs -// parameter so it can be used by the messages package. -func (c *ZKAdminClient) GetBootstrapAddrs() []string { - return c.bootstrapAddrs +func (c *ZKAdminClient) GetConnector() *Connector { + return c.Connector } // GetTopics gets information about one or more cluster topics from zookeeper. @@ -564,20 +568,13 @@ func (c *ZKAdminClient) CreateTopic( return errors.New("Cannot create topic in read-only mode") } - controllerAddr, err := c.getControllerAddr(ctx) - if err != nil { - return err + req := kafka.CreateTopicsRequest{ + Topics: []kafka.TopicConfig{config}, } - - conn, err := kafka.DefaultDialer.DialContext(ctx, "tcp", controllerAddr) - if err != nil { - return err - } - defer conn.Close() - log.Debugf("Creating topic with config %+v", config) - return conn.CreateTopics(config) + _, err := c.Connector.KafkaClient.CreateTopics(ctx, &req) + return err } // AssignPartitions notifies the cluster to begin a partition reassignment. diff --git a/pkg/apply/apply_test.go b/pkg/apply/apply_test.go index a14b4d3d..3851bfcc 100644 --- a/pkg/apply/apply_test.go +++ b/pkg/apply/apply_test.go @@ -879,7 +879,7 @@ func TestApplyOverrides(t *testing.T) { }, } - adminClient, err := clusterConfig.NewAdminClient(ctx, nil, false) + adminClient, err := clusterConfig.NewAdminClient(ctx, nil, false, "", "") require.NoError(t, err) applier, err := NewTopicApplier( @@ -922,7 +922,7 @@ func testApplier( }, } - adminClient, err := clusterConfig.NewAdminClient(ctx, nil, false) + adminClient, err := clusterConfig.NewAdminClient(ctx, nil, false, "", "") require.NoError(t, err) applier, err := NewTopicApplier( diff --git a/pkg/check/check_test.go b/pkg/check/check_test.go index 7f066acb..592d127e 100644 --- a/pkg/check/check_test.go +++ b/pkg/check/check_test.go @@ -28,7 +28,7 @@ func TestCheck(t *testing.T) { }, } - adminClient, err := clusterConfig.NewAdminClient(ctx, nil, false) + adminClient, err := clusterConfig.NewAdminClient(ctx, nil, false, "", "") require.NoError(t, err) topicName := util.RandomString("check-topic-", 6) diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 545b50ad..b2070edd 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -28,10 +28,9 @@ const ( // CLIRunner is a utility that runs commands from either the command-line or the repl. type CLIRunner struct { - adminClient admin.Client - groupsClient *groups.Client - printer func(f string, a ...interface{}) - spinnerObj *spinner.Spinner + adminClient admin.Client + printer func(f string, a ...interface{}) + spinnerObj *spinner.Spinner } // NewCLIRunner creates and returns a new CLIRunner instance. @@ -57,9 +56,6 @@ func NewCLIRunner( printer: printer, spinnerObj: spinnerObj, } - if adminClient != nil { - cliRunner.groupsClient = groups.NewClient(adminClient.GetBootstrapAddrs()[0]) - } return cliRunner } @@ -316,7 +312,7 @@ func (c *CLIRunner) GetConfig(ctx context.Context, brokerOrTopic string) error { func (c *CLIRunner) GetGroups(ctx context.Context) error { c.startSpinner() - groupCoordinators, err := c.groupsClient.GetGroups(ctx) + groupCoordinators, err := groups.GetGroups(ctx, c.adminClient.GetConnector()) c.stopSpinner() if err != nil { return err @@ -330,7 +326,11 @@ func (c *CLIRunner) GetGroups(ctx context.Context) error { func (c *CLIRunner) GetGroupMembers(ctx context.Context, groupID string, full bool) error { c.startSpinner() - groupDetails, err := c.groupsClient.GetGroupDetails(ctx, groupID) + groupDetails, err := groups.GetGroupDetails( + ctx, + c.adminClient.GetConnector(), + groupID, + ) c.stopSpinner() if err != nil { return err @@ -368,7 +368,12 @@ func (c *CLIRunner) GetMemberLags( return fmt.Errorf("Error fetching topic info: %+v", err) } - memberLags, err := c.groupsClient.GetMemberLags(ctx, topic, groupID) + memberLags, err := groups.GetMemberLags( + ctx, + c.adminClient.GetConnector(), + topic, + groupID, + ) c.stopSpinner() if err != nil { @@ -420,7 +425,7 @@ func (c *CLIRunner) GetOffsets(ctx context.Context, topic string) error { bounds, err := messages.GetAllPartitionBounds( ctx, - c.adminClient.GetBootstrapAddrs()[0], + c.adminClient.GetConnector(), topic, nil, ) @@ -476,7 +481,13 @@ func (c *CLIRunner) ResetOffsets( partitionOffsets map[int]int64, ) error { c.startSpinner() - err := c.groupsClient.ResetOffsets(ctx, topic, groupID, partitionOffsets) + err := groups.ResetOffsets( + ctx, + c.adminClient.GetConnector(), + topic, + groupID, + partitionOffsets, + ) c.stopSpinner() if err != nil { return err @@ -509,7 +520,7 @@ func (c *CLIRunner) Tail( log.Debugf("Tailing partitions %+v", partitions) tailer := messages.NewTopicTailer( - c.adminClient.GetBootstrapAddrs()[0], + c.adminClient.GetConnector(), topic, partitions, offset, diff --git a/pkg/cli/repl.go b/pkg/cli/repl.go index 084eb0b6..43fd711f 100644 --- a/pkg/cli/repl.go +++ b/pkg/cli/repl.go @@ -85,7 +85,10 @@ type Repl struct { } // NewRepl initializes and returns a Repl instance. -func NewRepl(ctx context.Context, adminClient admin.Client) (*Repl, error) { +func NewRepl( + ctx context.Context, + adminClient admin.Client, +) (*Repl, error) { cliRunner := NewCLIRunner( adminClient, func(f string, a ...interface{}) { @@ -149,8 +152,7 @@ func NewRepl(ctx context.Context, adminClient admin.Client) (*Repl, error) { } log.Debug("Loading consumer groups for auto-complete") - groupsClient := groups.NewClient(adminClient.GetBootstrapAddrs()[0]) - groupCoordinators, err := groupsClient.GetGroups(ctx) + groupCoordinators, err := groups.GetGroups(ctx, adminClient.GetConnector()) if err != nil { log.Warnf( "Error getting groups for auto-complete: %+v; auto-complete might not be fully functional", diff --git a/pkg/config/cluster.go b/pkg/config/cluster.go index 6604edbc..c307402c 100644 --- a/pkg/config/cluster.go +++ b/pkg/config/cluster.go @@ -4,22 +4,13 @@ import ( "context" "errors" "fmt" + "path/filepath" "time" "github.com/aws/aws-sdk-go/aws/session" "github.com/hashicorp/go-multierror" "github.com/segmentio/topicctl/pkg/admin" -) - -// KafkaVersionMajor is a string type for storing Kafka versions. -type KafkaVersionMajor string - -const ( - // KafkaVersionMajor010 represents kafka v0.10 and its associated minor versions. - KafkaVersionMajor010 KafkaVersionMajor = "v0.10" - - // KafkaVersionMajor2 represents kafka v2 and its associated minor versions. - KafkaVersionMajor2 KafkaVersionMajor = "v2" + log "github.com/sirupsen/logrus" ) // ClusterConfig stores information about a cluster that's referred to by one @@ -28,6 +19,9 @@ const ( type ClusterConfig struct { Meta ClusterMeta `json:"meta"` Spec ClusterSpec `json:"spec"` + + // RootDir is the root relative to which paths are evaluated. Set by loader. + RootDir string `json:"-"` } // ClusterMeta contains (mostly immutable) metadata about the cluster. Inspired @@ -46,7 +40,7 @@ type ClusterSpec struct { BootstrapAddrs []string `json:"bootstrapAddrs"` // ZKAddrs is a list of one or more zookeeper addresses. These can use IPs - // or DNS names. + // or DNS names. If these are omitted, then the tool will use broker APIs exclusively. ZKAddrs []string `json:"zkAddrs"` // ZKPrefix is the prefix under which all zk nodes for the cluster are stored. If blank, @@ -62,11 +56,6 @@ type ClusterSpec struct { // this check isn't done. ClusterID string `json:"clusterID"` - // VersionMajor stores the major version of the cluster. This isn't currently - // used for any logic in the tool, but it may be used in the future to adjust API calls - // and/or decide whether to use zk or brokers for certain information. - VersionMajor KafkaVersionMajor `json:"versionMajor"` - // DefaultThrottleMB is the default broker throttle used for migrations in this // cluster. If unset, then a reasonable default is used instead. DefaultThrottleMB int64 `json:"defaultThrottleMB"` @@ -75,9 +64,29 @@ type ClusterSpec struct { // limited by. If unset, no retention drop limiting will be applied. DefaultRetentionDropStepDurationStr string `json:"defaultRetentionDropStepDuration"` - // UseBrokerAdmin indicates whether we should use a broker-api-based admin (if true) or - // the old, zk-based admin (if false). - UseBrokerAdmin bool `json:"useBrokerAdmin"` + // TLS stores how we should use TLS with broker connections, if appropriate. Only + // applies if using the broker admin. + TLS TLSConfig `json:"tls"` + + // SASL stores how we should use SASL with broker connections, if appropriate. Only + // applies if using the broker admin. + SASL SASLConfig `json:"sasl"` +} + +type TLSConfig struct { + Enabled bool `json:"enabled"` + CACertPath string `json:"caCertPath"` + CertPath string `json:"certPath"` + KeyPath string `json:"keyPath"` + ServerName string `json:"serverName"` + SkipVerify bool `json:"skipVerify"` +} + +type SASLConfig struct { + Enabled bool `json:"enabled"` + Mechanism string `json:"mechanism"` + Username string `json:"username"` + Password string `json:"password"` } // Validate evaluates whether the cluster config is valid. @@ -100,13 +109,6 @@ func (c ClusterConfig) Validate() error { errors.New("At least one bootstrap broker address must be set"), ) } - if len(c.Spec.ZKAddrs) == 0 && !c.Spec.UseBrokerAdmin { - err = multierror.Append(err, errors.New("At least one zookeeper address must be set")) - } - if c.Spec.VersionMajor != KafkaVersionMajor010 && - c.Spec.VersionMajor != KafkaVersionMajor2 { - multierror.Append(err, errors.New("MajorVersion must be v0.10 or v2")) - } _, parseErr := c.GetDefaultRetentionDropStepDuration() if parseErr != nil { @@ -116,6 +118,25 @@ func (c ClusterConfig) Validate() error { ) } + if c.Spec.TLS.Enabled && len(c.Spec.ZKAddrs) > 0 { + err = multierror.Append( + err, + errors.New("TLS not supported with zk access mode; omit zk addresses to fix"), + ) + } + if c.Spec.SASL.Enabled && len(c.Spec.ZKAddrs) > 0 { + err = multierror.Append( + err, + errors.New("SASL not supported with zk access mode; omit zk addresses to fix"), + ) + } + + if c.Spec.SASL.Enabled { + if saslErr := admin.ValidateSASLMechanism(c.Spec.SASL.Mechanism); saslErr != nil { + err = multierror.Append(err, saslErr) + } + } + return err } @@ -132,16 +153,52 @@ func (c ClusterConfig) NewAdminClient( ctx context.Context, sess *session.Session, readOnly bool, + usernameOverride string, + passwordOverride string, ) (admin.Client, error) { - if c.Spec.UseBrokerAdmin { + if len(c.Spec.ZKAddrs) == 0 { + log.Debug("No ZK addresses provided, using broker admin client") + + var saslUsername string + var saslPassword string + if usernameOverride != "" { + log.Debugf("Setting SASL username from override value") + saslUsername = usernameOverride + } else { + saslUsername = c.Spec.SASL.Username + } + + if passwordOverride != "" { + log.Debugf("Setting SASL password from override value") + saslPassword = passwordOverride + } else { + saslPassword = c.Spec.SASL.Password + } + return admin.NewBrokerAdminClient( ctx, admin.BrokerAdminClientConfig{ - BrokerAddr: c.Spec.BootstrapAddrs[0], - ReadOnly: readOnly, + ConnectorConfig: admin.ConnectorConfig{ + BrokerAddr: c.Spec.BootstrapAddrs[0], + TLS: admin.TLSConfig{ + Enabled: c.Spec.TLS.Enabled, + CACertPath: c.absPath(c.Spec.TLS.CACertPath), + KeyPath: c.absPath(c.Spec.TLS.KeyPath), + ServerName: c.Spec.TLS.ServerName, + SkipVerify: c.Spec.TLS.SkipVerify, + }, + SASL: admin.SASLConfig{ + Enabled: c.Spec.SASL.Enabled, + Mechanism: c.Spec.SASL.Mechanism, + Username: saslUsername, + Password: saslPassword, + }, + }, + ReadOnly: readOnly, }, ) } else { + log.Debug("ZK addresses provided, using zk admin client") return admin.NewZKAdminClient( ctx, admin.ZKAdminClientConfig{ @@ -155,3 +212,11 @@ func (c ClusterConfig) NewAdminClient( ) } } + +func (c ClusterConfig) absPath(relPath string) string { + if relPath == "" || c.RootDir == "" || filepath.IsAbs(relPath) { + return relPath + } + + return filepath.Join(c.RootDir, relPath) +} diff --git a/pkg/config/cluster_test.go b/pkg/config/cluster_test.go index 9e0b3a40..c0987983 100644 --- a/pkg/config/cluster_test.go +++ b/pkg/config/cluster_test.go @@ -26,7 +26,6 @@ func TestClusterValidate(t *testing.T) { Spec: ClusterSpec{ BootstrapAddrs: []string{"broker-addr"}, ZKAddrs: []string{"zk-addr"}, - VersionMajor: "v2", DefaultRetentionDropStepDurationStr: "5m", }, }, @@ -42,7 +41,6 @@ func TestClusterValidate(t *testing.T) { Spec: ClusterSpec{ BootstrapAddrs: []string{"broker-addr"}, ZKAddrs: []string{"zk-addr"}, - VersionMajor: "v2", }, }, expError: true, @@ -57,8 +55,7 @@ func TestClusterValidate(t *testing.T) { Description: "test-description", }, Spec: ClusterSpec{ - ZKAddrs: []string{"zk-addr"}, - VersionMajor: "v2", + ZKAddrs: []string{"zk-addr"}, }, }, expError: true, @@ -74,10 +71,9 @@ func TestClusterValidate(t *testing.T) { }, Spec: ClusterSpec{ BootstrapAddrs: []string{"broker-addr"}, - VersionMajor: "v2", }, }, - expError: true, + expError: false, }, { description: "bad retention drop format", @@ -91,7 +87,6 @@ func TestClusterValidate(t *testing.T) { Spec: ClusterSpec{ BootstrapAddrs: []string{"broker-addr"}, ZKAddrs: []string{"zk-addr"}, - VersionMajor: "v2", DefaultRetentionDropStepDurationStr: "10xxx", }, }, diff --git a/pkg/config/load.go b/pkg/config/load.go index 987ed504..96eeeeba 100644 --- a/pkg/config/load.go +++ b/pkg/config/load.go @@ -3,6 +3,7 @@ package config import ( "errors" "io/ioutil" + "path/filepath" "regexp" "strings" @@ -18,7 +19,19 @@ func LoadClusterFile(path string) (ClusterConfig, error) { if err != nil { return ClusterConfig{}, err } - return LoadClusterBytes(contents) + + absPath, err := filepath.Abs(path) + if err != nil { + return ClusterConfig{}, err + } + + config, err := LoadClusterBytes(contents) + if err != nil { + return ClusterConfig{}, err + } + + config.RootDir = filepath.Dir(absPath) + return config, nil } // LoadClusterBytes loads a ClusterConfig from YAML bytes. diff --git a/pkg/config/load_test.go b/pkg/config/load_test.go index bd1bb4c7..233d51b9 100644 --- a/pkg/config/load_test.go +++ b/pkg/config/load_test.go @@ -10,6 +10,10 @@ import ( func TestLoadCluster(t *testing.T) { clusterConfig, err := LoadClusterFile("testdata/test-cluster/cluster.yaml") assert.Nil(t, err) + + // Empty RootDir since this will vary based on where test is run. + clusterConfig.RootDir = "" + assert.Equal( t, ClusterConfig{ @@ -20,7 +24,6 @@ func TestLoadCluster(t *testing.T) { Description: "Test cluster\n", }, Spec: ClusterSpec{ - VersionMajor: KafkaVersionMajor010, BootstrapAddrs: []string{ "bootstrap-addr:9092", }, diff --git a/pkg/config/testdata/test-cluster/cluster-invalid.yaml b/pkg/config/testdata/test-cluster/cluster-invalid.yaml index 541353f5..10192341 100644 --- a/pkg/config/testdata/test-cluster/cluster-invalid.yaml +++ b/pkg/config/testdata/test-cluster/cluster-invalid.yaml @@ -7,4 +7,7 @@ meta: clusterId: test-cluster-id spec: - versionMajor: v0.40 + zkAddrs: + - localhost:2181 + tls: + enabled: true diff --git a/pkg/config/testdata/test-cluster/cluster.yaml b/pkg/config/testdata/test-cluster/cluster.yaml index 788ba588..c615369a 100644 --- a/pkg/config/testdata/test-cluster/cluster.yaml +++ b/pkg/config/testdata/test-cluster/cluster.yaml @@ -6,7 +6,6 @@ meta: Test cluster spec: - versionMajor: v0.10 bootstrapAddrs: - bootstrap-addr:9092 zkAddrs: diff --git a/pkg/groups/client.go b/pkg/groups/groups.go similarity index 82% rename from pkg/groups/client.go rename to pkg/groups/groups.go index 9122f154..5cac948f 100644 --- a/pkg/groups/client.go +++ b/pkg/groups/groups.go @@ -7,30 +7,19 @@ import ( "sort" "github.com/segmentio/kafka-go" + "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/messages" ) -// Client is a struct for getting information about consumer groups from a cluster. -type Client struct { - brokerAddr string - client *kafka.Client -} - -// NewClient creates and returns a new Client instance. -func NewClient(brokerAddr string) *Client { - return &Client{ - brokerAddr: brokerAddr, - client: &kafka.Client{ - Addr: kafka.TCP(brokerAddr), - }, - } -} - // GetGroups fetches and returns information about all consumer groups in the cluster. -func (c *Client) GetGroups( +func GetGroups( ctx context.Context, + connector *admin.Connector, ) ([]GroupCoordinator, error) { - listGroupsResp, err := c.client.ListGroups(ctx, kafka.ListGroupsRequest{}) + listGroupsResp, err := connector.KafkaClient.ListGroups( + ctx, + kafka.ListGroupsRequest{}, + ) // Don't immediately fail if err is non-nil; instead, just process and return // whatever results are returned. @@ -54,11 +43,12 @@ func (c *Client) GetGroups( } // GetGroupDetails returns the details (membership, etc.) for a single consumer group. -func (c *Client) GetGroupDetails( +func GetGroupDetails( ctx context.Context, + connector *admin.Connector, groupID string, ) (*GroupDetails, error) { - describeGroupsResponse, err := c.client.DescribeGroup( + describeGroupsResponse, err := connector.KafkaClient.DescribeGroup( ctx, kafka.DescribeGroupsRequest{GroupIDs: []string{groupID}}, ) @@ -113,12 +103,13 @@ func (c *Client) GetGroupDetails( // GetMemberLags returns the lag for each partition being consumed by the argument group in the // argument topic. -func (c *Client) GetMemberLags( +func GetMemberLags( ctx context.Context, + connector *admin.Connector, topic string, groupID string, ) ([]MemberPartitionLag, error) { - groupDetails, err := c.GetGroupDetails(ctx, groupID) + groupDetails, err := GetGroupDetails(ctx, connector, groupID) if err != nil { return nil, err } @@ -129,7 +120,7 @@ func (c *Client) GetMemberLags( partitionMembers := groupDetails.PartitionMembers(topic) - offsets, err := c.client.ConsumerOffsets( + offsets, err := connector.KafkaClient.ConsumerOffsets( ctx, kafka.TopicAndGroup{ Topic: topic, GroupId: groupID, @@ -139,7 +130,7 @@ func (c *Client) GetMemberLags( return nil, err } - bounds, err := messages.GetAllPartitionBounds(ctx, c.brokerAddr, topic, offsets) + bounds, err := messages.GetAllPartitionBounds(ctx, connector, topic, offsets) if err != nil { return nil, err } @@ -167,8 +158,9 @@ func (c *Client) GetMemberLags( } // ResetOffsets updates the offsets for a given topic / group combination. -func (c *Client) ResetOffsets( +func ResetOffsets( ctx context.Context, + connector *admin.Connector, topic string, groupID string, partitionOffsets map[int]int64, @@ -176,8 +168,9 @@ func (c *Client) ResetOffsets( consumerGroup, err := kafka.NewConsumerGroup( kafka.ConsumerGroupConfig{ ID: groupID, - Brokers: []string{c.brokerAddr}, + Brokers: []string{connector.Config.BrokerAddr}, Topics: []string{topic}, + Dialer: connector.Dialer, }, ) if err != nil { diff --git a/pkg/groups/client_test.go b/pkg/groups/groups_test.go similarity index 67% rename from pkg/groups/client_test.go rename to pkg/groups/groups_test.go index 282553cb..f0e431d9 100644 --- a/pkg/groups/client_test.go +++ b/pkg/groups/groups_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/segmentio/kafka-go" + "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/util" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -14,12 +15,18 @@ import ( func TestGetGroups(t *testing.T) { ctx := context.Background() - topicName := createTestTopic(ctx, t) + connector, err := admin.NewConnector(admin.ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }) + require.NoError(t, err) + + topicName := createTestTopic(ctx, t, connector) groupID := fmt.Sprintf("test-group-%s", topicName) reader := kafka.NewReader( kafka.ReaderConfig{ - Brokers: []string{util.TestKafkaAddr()}, + Brokers: []string{connector.Config.BrokerAddr}, + Dialer: connector.Dialer, GroupID: groupID, Topic: topicName, MinBytes: 50, @@ -35,8 +42,7 @@ func TestGetGroups(t *testing.T) { require.NoError(t, err) } - client := NewClient(util.TestKafkaAddr()) - groups, err := client.GetGroups(ctx) + groups, err := GetGroups(ctx, connector) require.NoError(t, err) // There could be older groups in here, just ignore them @@ -52,7 +58,7 @@ func TestGetGroups(t *testing.T) { } require.True(t, match) - groupDetails, err := client.GetGroupDetails(ctx, groupID) + groupDetails, err := GetGroupDetails(ctx, connector, groupID) require.NoError(t, err) assert.Equal(t, groupID, groupDetails.GroupID) assert.Equal(t, "Stable", groupDetails.State) @@ -70,12 +76,18 @@ func TestGetGroups(t *testing.T) { func TestGetLags(t *testing.T) { ctx := context.Background() - topicName := createTestTopic(ctx, t) + connector, err := admin.NewConnector(admin.ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }) + require.NoError(t, err) + + topicName := createTestTopic(ctx, t, connector) groupID := fmt.Sprintf("test-group-%s", topicName) reader := kafka.NewReader( kafka.ReaderConfig{ - Brokers: []string{util.TestKafkaAddr()}, + Brokers: []string{connector.Config.BrokerAddr}, + Dialer: connector.Dialer, GroupID: groupID, Topic: topicName, MinBytes: 50, @@ -91,8 +103,7 @@ func TestGetLags(t *testing.T) { require.NoError(t, err) } - client := NewClient(util.TestKafkaAddr()) - lags, err := client.GetMemberLags(ctx, topicName, groupID) + lags, err := GetMemberLags(ctx, connector, topicName, groupID) require.NoError(t, err) require.Equal(t, 2, len(lags)) @@ -105,12 +116,18 @@ func TestGetLags(t *testing.T) { func TestResetOffsets(t *testing.T) { ctx := context.Background() - topicName := createTestTopic(ctx, t) + connector, err := admin.NewConnector(admin.ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }) + require.NoError(t, err) + + topicName := createTestTopic(ctx, t, connector) groupID := fmt.Sprintf("test-group-%s", topicName) reader := kafka.NewReader( kafka.ReaderConfig{ - Brokers: []string{util.TestKafkaAddr()}, + Brokers: []string{connector.Config.BrokerAddr}, + Dialer: connector.Dialer, GroupID: groupID, Topic: topicName, MinBytes: 50, @@ -126,9 +143,10 @@ func TestResetOffsets(t *testing.T) { require.NoError(t, err) } - client := NewClient(util.TestKafkaAddr()) - err := client.ResetOffsets( + require.NoError(t, err) + err = ResetOffsets( ctx, + connector, topicName, groupID, map[int]int64{ @@ -138,7 +156,7 @@ func TestResetOffsets(t *testing.T) { ) require.NoError(t, err) - lags, err := client.GetMemberLags(ctx, topicName, groupID) + lags, err := GetMemberLags(ctx, connector, topicName, groupID) require.NoError(t, err) require.Equal(t, 2, len(lags)) @@ -146,17 +164,22 @@ func TestResetOffsets(t *testing.T) { assert.Equal(t, int64(1), lags[1].MemberOffset) } -func createTestTopic(ctx context.Context, t *testing.T) string { - controllerConn := util.TestKafkaContollerConn(ctx, t) - defer controllerConn.Close() - +func createTestTopic( + ctx context.Context, + t *testing.T, + connector *admin.Connector, +) string { topicName := util.RandomString("topic-groups-", 6) - - err := controllerConn.CreateTopics( - kafka.TopicConfig{ - Topic: topicName, - NumPartitions: 2, - ReplicationFactor: 1, + _, err := connector.KafkaClient.CreateTopics( + ctx, + &kafka.CreateTopicsRequest{ + Topics: []kafka.TopicConfig{ + { + Topic: topicName, + NumPartitions: 2, + ReplicationFactor: 1, + }, + }, }, ) require.NoError(t, err) @@ -164,7 +187,8 @@ func createTestTopic(ctx context.Context, t *testing.T) string { writer := kafka.NewWriter( kafka.WriterConfig{ - Brokers: []string{util.TestKafkaAddr()}, + Brokers: []string{connector.Config.BrokerAddr}, + Dialer: connector.Dialer, Topic: topicName, BatchSize: 10, }, diff --git a/pkg/messages/bounds.go b/pkg/messages/bounds.go index 428491a6..8ad973f6 100644 --- a/pkg/messages/bounds.go +++ b/pkg/messages/bounds.go @@ -7,6 +7,7 @@ import ( "time" "github.com/segmentio/kafka-go" + "github.com/segmentio/topicctl/pkg/admin" log "github.com/sirupsen/logrus" // Read snappy-compressed messages @@ -45,11 +46,11 @@ type Bounds struct { // is nil, the starting offset in each topic partition. func GetAllPartitionBounds( ctx context.Context, - brokerAddr string, + connector *admin.Connector, topic string, baseOffsets map[int]int64, ) ([]Bounds, error) { - conn, err := kafka.DefaultDialer.DialContext(ctx, "tcp", brokerAddr) + conn, err := connector.Dialer.DialContext(ctx, "tcp", connector.Config.BrokerAddr) if err != nil { return nil, err } @@ -90,7 +91,7 @@ func GetAllPartitionBounds( bounds, err := GetPartitionBounds( ctx, - brokerAddr, + connector, topic, nextPartition.ID, minOffset, @@ -131,7 +132,7 @@ func GetAllPartitionBounds( // this is used instead of the actual first offset. func GetPartitionBounds( ctx context.Context, - brokerAddr string, + connector *admin.Connector, topic string, partition int, minOffset int64, @@ -143,7 +144,7 @@ func GetPartitionBounds( minOffset, ) - conn, err := dialLeaderRetries(ctx, brokerAddr, topic, partition) + conn, err := dialLeaderRetries(ctx, connector, topic, partition) if err != nil { return Bounds{}, err } @@ -205,7 +206,7 @@ func GetPartitionBounds( // Use a separate connection for reading the last message. For whatever reason, // reusing the same connection with kafka-go on newer kafka versions can lead to read errors. - conn2, err := dialLeaderRetries(ctx, brokerAddr, topic, partition) + conn2, err := dialLeaderRetries(ctx, connector, topic, partition) if err != nil { return Bounds{}, err } @@ -243,7 +244,7 @@ func GetPartitionBounds( func dialLeaderRetries( ctx context.Context, - brokerAddr string, + connector *admin.Connector, topic string, partition int, ) (*kafka.Conn, error) { @@ -253,7 +254,13 @@ func dialLeaderRetries( sleepDuration := backoffInitSleepDuration for i := 0; i < maxRetries; i++ { - conn, err = kafka.DialLeader(ctx, "tcp", brokerAddr, topic, partition) + conn, err = connector.Dialer.DialLeader( + ctx, + "tcp", + connector.Config.BrokerAddr, + topic, + partition, + ) if err == nil { break } diff --git a/pkg/messages/bounds_test.go b/pkg/messages/bounds_test.go index 88df155c..ed768903 100644 --- a/pkg/messages/bounds_test.go +++ b/pkg/messages/bounds_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/segmentio/kafka-go" + "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/util" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -14,16 +15,22 @@ import ( func TestGetAllPartitionBounds(t *testing.T) { ctx := context.Background() - controllerConn := util.TestKafkaContollerConn(ctx, t) - defer controllerConn.Close() + connector, err := admin.NewConnector(admin.ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }) + require.NoError(t, err) topicName := util.RandomString("topic-bounds-", 6) - - err := controllerConn.CreateTopics( - kafka.TopicConfig{ - Topic: topicName, - NumPartitions: 4, - ReplicationFactor: 1, + _, err = connector.KafkaClient.CreateTopics( + ctx, + &kafka.CreateTopicsRequest{ + Topics: []kafka.TopicConfig{ + { + Topic: topicName, + NumPartitions: 4, + ReplicationFactor: 1, + }, + }, }, ) require.NoError(t, err) @@ -31,7 +38,8 @@ func TestGetAllPartitionBounds(t *testing.T) { writer := kafka.NewWriter( kafka.WriterConfig{ - Brokers: []string{util.TestKafkaAddr()}, + Brokers: []string{connector.Config.BrokerAddr}, + Dialer: connector.Dialer, Topic: topicName, Balancer: &kafka.RoundRobin{}, }, @@ -53,7 +61,7 @@ func TestGetAllPartitionBounds(t *testing.T) { err = writer.WriteMessages(ctx, messages...) require.NoError(t, err) - bounds, err := GetAllPartitionBounds(ctx, util.TestKafkaAddr(), topicName, nil) + bounds, err := GetAllPartitionBounds(ctx, connector, topicName, nil) assert.Nil(t, err) // The first partition gets 3 messages @@ -69,7 +77,7 @@ func TestGetAllPartitionBounds(t *testing.T) { boundsWithOffsets, err := GetAllPartitionBounds( ctx, - util.TestKafkaAddr(), + connector, topicName, map[int]int64{ 0: 1, diff --git a/pkg/messages/tail.go b/pkg/messages/tail.go index 71efbae9..481d185f 100644 --- a/pkg/messages/tail.go +++ b/pkg/messages/tail.go @@ -10,6 +10,7 @@ import ( "github.com/fatih/color" "github.com/segmentio/kafka-go" + "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/util" log "github.com/sirupsen/logrus" @@ -22,7 +23,7 @@ import ( // TopicTailer fetches a stream of messages from a topic. type TopicTailer struct { - brokerAddr string + Connector *admin.Connector topic string partitions []int offset int64 @@ -32,7 +33,7 @@ type TopicTailer struct { // NewTopicTailer returns a new TopicTailer instance. func NewTopicTailer( - brokerAddr string, + Connector *admin.Connector, topic string, partitions []int, offset int64, @@ -40,7 +41,7 @@ func NewTopicTailer( maxBytes int, ) *TopicTailer { return &TopicTailer{ - brokerAddr: brokerAddr, + Connector: Connector, topic: topic, partitions: partitions, offset: offset, @@ -86,7 +87,8 @@ func (t *TopicTailer) GetMessages( for _, partition := range t.partitions { reader := kafka.NewReader( kafka.ReaderConfig{ - Brokers: []string{t.brokerAddr}, + Brokers: []string{t.Connector.Config.BrokerAddr}, + Dialer: t.Connector.Dialer, Topic: t.topic, Partition: partition, MinBytes: t.minBytes, diff --git a/pkg/messages/tail_test.go b/pkg/messages/tail_test.go index c46c81b8..8011d982 100644 --- a/pkg/messages/tail_test.go +++ b/pkg/messages/tail_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/segmentio/kafka-go" + "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/util" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -16,15 +17,22 @@ func TestTailerGetMessages(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controllerConn := util.TestKafkaContollerConn(ctx, t) + connector, err := admin.NewConnector(admin.ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }) + require.NoError(t, err) topicName := util.RandomString("topic-tail-", 6) - - err := controllerConn.CreateTopics( - kafka.TopicConfig{ - Topic: topicName, - NumPartitions: 4, - ReplicationFactor: 1, + _, err = connector.KafkaClient.CreateTopics( + ctx, + &kafka.CreateTopicsRequest{ + Topics: []kafka.TopicConfig{ + { + Topic: topicName, + NumPartitions: 4, + ReplicationFactor: 1, + }, + }, }, ) require.NoError(t, err) @@ -32,7 +40,8 @@ func TestTailerGetMessages(t *testing.T) { writer := kafka.NewWriter( kafka.WriterConfig{ - Brokers: []string{util.TestKafkaAddr()}, + Brokers: []string{connector.Config.BrokerAddr}, + Dialer: connector.Dialer, Topic: topicName, Balancer: &kafka.RoundRobin{}, }, @@ -55,7 +64,7 @@ func TestTailerGetMessages(t *testing.T) { require.NoError(t, err) tailer := NewTopicTailer( - util.TestKafkaAddr(), + connector, topicName, []int{0, 1, 2, 3}, kafka.FirstOffset, diff --git a/pkg/util/testing.go b/pkg/util/testing.go index 0d4db00b..33301be6 100644 --- a/pkg/util/testing.go +++ b/pkg/util/testing.go @@ -1,14 +1,12 @@ package util import ( - "context" "fmt" "math/rand" "os" "testing" "time" - "github.com/segmentio/kafka-go" "github.com/stretchr/testify/require" ) @@ -40,32 +38,6 @@ func TestKafkaAddr() string { return testKafkaAddr } -// TestKafkaConn returns a kafka-go connection for unit testing purposes. -func TestKafkaConn(ctx context.Context, t *testing.T) *kafka.Conn { - conn, err := kafka.DefaultDialer.DialContext(ctx, "tcp", TestKafkaAddr()) - require.NoError(t, err) - return conn -} - -// TestKafkaContollerConn returns a kafka-go connection to the cluster controller -// for unit testing purposes. -func TestKafkaContollerConn(ctx context.Context, t *testing.T) *kafka.Conn { - conn := TestKafkaConn(ctx, t) - defer conn.Close() - - broker, err := conn.Controller() - require.NoError(t, err) - - controllerConn, err := kafka.DefaultDialer.DialContext( - ctx, - "tcp", - fmt.Sprintf("%s:%d", broker.Host, broker.Port), - ) - - require.NoError(t, err) - return controllerConn -} - func CanTestBrokerAdmin() bool { value, ok := os.LookupEnv("KAFKA_TOPICS_TEST_BROKER_ADMIN") if ok && value != "" {