Skip to content

Commit

Permalink
Merge pull request #180 from ahamidi/support-tls
Browse files Browse the repository at this point in the history
Support TLS connection to Kafka
  • Loading branch information
horkhe committed Feb 13, 2020
2 parents 1c124b4 + 0137bed commit 91fc2e6
Show file tree
Hide file tree
Showing 10 changed files with 501 additions and 10 deletions.
4 changes: 4 additions & 0 deletions README.md
Expand Up @@ -451,6 +451,10 @@ must be specified in order to run with security enabled.

If configured, both the gRPC and HTTP servers will run with TLS enabled.

Additionally TLS may be configured for the Kafka cluster by enabling `tls` in
the `kafka` section of the configuration YAML (along with any required
certificates). Details can be found in the default YAML file (`default.yaml`).

## License

Kafka-Pixy is under the Apache 2.0 license. See the [LICENSE](LICENSE) file for details.
111 changes: 110 additions & 1 deletion config/config.go
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/hex"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -70,7 +71,6 @@ func (lc *LoggerCfg) Level() log.Level {
return level
}


// Proxy defines configuration of a proxy to a particular Kafka/ZooKeeper
// cluster.
type Proxy struct {
Expand All @@ -87,6 +87,28 @@ type Proxy struct {

// Version of the Kafka cluster. Supported versions are 0.10.2.1 - 2.0.0
Version KafkaVersion

// Optionally use TLS when connecting to Kafka. This must be enabled
// for following options to be used.
TLSEnabled bool `yaml:"tls"`

// The path to the CA certificate (PEM)
CACertFile string `yaml:"ca_certificate_file"`

// The path to the Client Certificate (PEM)
ClientCertFile string `yaml:"client_certificate_file"`

// The path to the Client Key (PEM)
ClientCertKeyFile string `yaml:"client_key_file"`

// From the tls package:
// InsecureSkipVerify controls whether a client verifies the
// server's certificate chain and host name.
// If InsecureSkipVerify is true, TLS accepts any certificate
// presented by the server and any host name in that certificate.
// In this mode, TLS is susceptible to man-in-the-middle attacks.
// This should be used only for testing.
InsecureSkipVerify bool `yaml:"insecure"`
} `yaml:"kafka"`

ZooKeeper struct {
Expand Down Expand Up @@ -337,6 +359,13 @@ func (p *Proxy) SaramaProducerCfg() *sarama.Config {
saramaCfg.Producer.RequiredAcks = sarama.RequiredAcks(p.Producer.RequiredAcks)
saramaCfg.Producer.Partitioner, _ = p.Producer.Partitioner.ToPartitionerConstructor()
saramaCfg.Producer.Timeout = p.Producer.Timeout

if p.Kafka.TLSEnabled {
saramaCfg.Net.TLS.Enable = true
tlsCfg, _ := p.newTLSConfig() // Ok to ignore err since we validated
saramaCfg.Net.TLS.Config = tlsCfg
}

return saramaCfg
}

Expand All @@ -350,9 +379,48 @@ func (p *Proxy) SaramaClientCfg() *sarama.Config {
saramaCfg.Net.ReadTimeout = p.Net.ReadTimeout
saramaCfg.Net.WriteTimeout = p.Net.WriteTimeout

if p.Kafka.TLSEnabled {
saramaCfg.Net.TLS.Enable = true
tlsCfg, _ := p.newTLSConfig() // Ok to ignore err since we validated
saramaCfg.Net.TLS.Config = tlsCfg
}

return saramaCfg
}

func (p *Proxy) newTLSConfig() (*tls.Config, error) {
tlsConfig := &tls.Config{
InsecureSkipVerify: p.Kafka.InsecureSkipVerify,
}

if p.Kafka.CACertFile != "" {
// build root CA
roots := x509.NewCertPool()
caCert, err := ioutil.ReadFile(p.Kafka.CACertFile)
if err != nil {
return nil, err
}
ok := roots.AppendCertsFromPEM(caCert)
if !ok {
return nil, err
}

tlsConfig.RootCAs = roots
}

if p.Kafka.ClientCertFile != "" && p.Kafka.ClientCertKeyFile != "" {
// setup client certs
cert, err := tls.LoadX509KeyPair(p.Kafka.ClientCertFile, p.Kafka.ClientCertKeyFile)
if err != nil {
return nil, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
}

tlsConfig.BuildNameToCertificate()
return tlsConfig, nil
}

// DefaultApp returns default application configuration where default proxy has
// the specified cluster.
func DefaultApp(cluster string) *App {
Expand Down Expand Up @@ -486,6 +554,47 @@ func (p *Proxy) validate() error {
case p.Consumer.RetryBackoff <= 0:
return errors.New("consumer.retry_backoff must be > 0")
}

// Validate TLS configuration.
if err := p.validateTLS(); err != nil {
return fmt.Errorf("invalid tls configuration: %q", err)
}

return nil
}

func (p *Proxy) validateTLS() error {
// Validate the CA certificate
if p.Kafka.CACertFile != "" {
caCert, err := ioutil.ReadFile(p.Kafka.CACertFile)
if err != nil {
return fmt.Errorf("kafka.ca_cert_file: %s", err)
}
roots := x509.NewCertPool()
ok := roots.AppendCertsFromPEM(caCert)
if !ok {
return errors.New("kafka.ca_cert_file does not appear to be a valid CA certificate")
}
}

// Validate the client certificate and key
if p.Kafka.ClientCertFile != "" && p.Kafka.ClientCertKeyFile != "" {
_, err := ioutil.ReadFile(p.Kafka.ClientCertFile)
if err != nil {
return fmt.Errorf("kafka.client_cert_file: %s", err)
}

_, err = ioutil.ReadFile(p.Kafka.ClientCertKeyFile)
if err != nil {
return fmt.Errorf("kafka.client_cert_key_file: %s", err)
}

_, err = tls.LoadX509KeyPair(p.Kafka.ClientCertFile, p.Kafka.ClientCertKeyFile)
if err != nil {
return errors.New("kafka.client_cert_file and kafka.client_cert_key_file are not a valid certificate pair")
}
}

return nil
}

Expand Down
15 changes: 15 additions & 0 deletions config/config_test.go
Expand Up @@ -139,3 +139,18 @@ func (s *ConfigSuite) TestFromYAMLCustomAddresses(c *C) {
c.Assert(appCfg.GRPCAddr, Equals, expected.GRPCAddr)
c.Assert(appCfg.UnixAddr, Equals, expected.UnixAddr)
}

func (s *ConfigSuite) TestFromYAMLKafkaTLS(c *C) {
// When
appCfg, err := FromYAMLFile("../testdata/kafka-tls.yaml")

// Then
c.Assert(err, IsNil)

kafkaCfg := appCfg.Proxies["default"].Kafka
c.Assert(kafkaCfg.TLSEnabled, Equals, true)
c.Assert(kafkaCfg.CACertFile, Equals, "../testdata/ca.crt")
c.Assert(kafkaCfg.ClientCertFile, Equals, "../testdata/client.crt")
c.Assert(kafkaCfg.ClientCertKeyFile, Equals, "../testdata/client.key")
c.Assert(kafkaCfg.InsecureSkipVerify, Equals, false)
}
15 changes: 15 additions & 0 deletions default.yaml
Expand Up @@ -34,6 +34,21 @@ proxies:
# Version of the Kafka cluster. Supported versions are 0.10.2.1 - 2.0.0
version: 0.10.2.1

# Enable TLS when connecting to the Kafka cluster
tls: false

# The filepath to the CA root certificate
# ca_certificate_file:

# The filepath to the client certificate
# client_certificate_file:

# The filepath to the client certificate key
# client_key_file:

# Disable hostname verification
# insecure: false

# Networking parameters section. These all pass through to sarama's
# `config.Net` field.
net:
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Expand Up @@ -89,6 +89,7 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mailgun/holster v3.0.0+incompatible h1:bpt8ZCwLBrzjqfBZ5mobNb2NjesNeDHmsOO++Ek9Swc=
github.com/mailgun/holster v3.0.0+incompatible/go.mod h1:crzolGx27RP/IBT/BnPQiYBB9igmAFHGRrz0zlMP0b0=
github.com/mailgun/holster/v3 v3.8.0 h1:unYHTrMBlMqzozNrddT84jC/jRfZ6h0jGVLXYQr9mZw=
github.com/mailgun/holster/v3 v3.8.0/go.mod h1:rNcFlhMTxFDa1dnQC4sUqI71IpAa9/aPeU6w8IGF3aQ=
github.com/mailgun/logrus-hooks v1.2.1 h1:1paOVAsjXbvqoI1dft6gG+8WRc+CPJtnJjOjxhBkmuo=
github.com/mailgun/logrus-hooks v1.2.1/go.mod h1:YRm/XRR8Kmhtyq6FbqszTPq+RxEHvtc1Dlf0Beo8P1k=
Expand Down Expand Up @@ -144,6 +145,7 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/thrawn01/args v0.3.0 h1:XbMnfGaw6nFbm8hgSncHu20cGrZMTP8BnxiusA43AeE=
github.com/thrawn01/args v0.3.0/go.mod h1:TnRiOFjyh7Wa6oC8ACFPc7KIvbzCiluphA3mJUiPIEo=
Expand Down Expand Up @@ -184,6 +186,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
43 changes: 34 additions & 9 deletions main.go
Expand Up @@ -17,14 +17,19 @@ import (
)

var (
cmdGRPCAddr string
cmdConfig string
cmdTCPAddr string
cmdUnixAddr string
cmdKafkaPeers string
cmdZookeeperPeers string
cmdPIDFile string
cmdLoggingJSONCfg string
cmdGRPCAddr string
cmdConfig string
cmdTCPAddr string
cmdUnixAddr string
cmdKafkaPeers string
cmdZookeeperPeers string
cmdPIDFile string
cmdLoggingJSONCfg string
cmdTLSEnabled bool
cmdCACertFile string
cmdClientCertFile string
cmdClientCertKeyFile string
cmdInsecure bool
)

func init() {
Expand All @@ -36,6 +41,11 @@ func init() {
flag.StringVar(&cmdZookeeperPeers, "zookeeperPeers", "", "Comma separated list of ZooKeeper nodes followed by optional chroot")
flag.StringVar(&cmdPIDFile, "pidFile", "", "Path to the PID file")
flag.StringVar(&cmdLoggingJSONCfg, "logging", "", "Logging configuration")
flag.BoolVar(&cmdTLSEnabled, "tls", false, "Enable TLS (Kafka consumer/producer)")
flag.StringVar(&cmdCACertFile, "caCertFile", "", "CA certificate file")
flag.StringVar(&cmdClientCertFile, "clientCertFile", "", "Client certificate file")
flag.StringVar(&cmdClientCertKeyFile, "clientCertKeyFile", "", "Client certificate key file")
flag.BoolVar(&cmdInsecure, "insecureTLS", false, "Disable TLS hostname verification")
flag.Parse()
}

Expand Down Expand Up @@ -108,6 +118,21 @@ func makeConfig() (*config.App, error) {
if cmdKafkaPeers != "" {
cfg.Proxies[cfg.DefaultCluster].Kafka.SeedPeers = strings.Split(cmdKafkaPeers, ",")
}
if cmdTLSEnabled {
cfg.Proxies[cfg.DefaultCluster].Kafka.TLSEnabled = cmdTLSEnabled
}
if cmdCACertFile != "" {
cfg.Proxies[cfg.DefaultCluster].Kafka.CACertFile = cmdCACertFile
}
if cmdClientCertFile != "" {
cfg.Proxies[cfg.DefaultCluster].Kafka.ClientCertFile = cmdClientCertFile
}
if cmdClientCertKeyFile != "" {
cfg.Proxies[cfg.DefaultCluster].Kafka.ClientCertKeyFile = cmdClientCertKeyFile
}
if cmdInsecure {
cfg.Proxies[cfg.DefaultCluster].Kafka.InsecureSkipVerify = cmdInsecure
}
if cmdZookeeperPeers != "" {
chrootStartIdx := strings.Index(cmdZookeeperPeers, "/")
if chrootStartIdx >= 0 {
Expand All @@ -119,7 +144,7 @@ func makeConfig() (*config.App, error) {
}
setter.SetDefault(&cfg.Logging, []config.LoggerCfg{
{
Name: "console",
Name: "console",
Severity: "info",
},
})
Expand Down
19 changes: 19 additions & 0 deletions testdata/ca.crt
@@ -0,0 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIDGjCCAgICCQDur4OFb/KKXDANBgkqhkiG9w0BAQsFADBPMQswCQYDVQQGEwJV
UzETMBEGA1UECAwKQ2FsaWZvcm5pYTEWMBQGA1UEBwwNU2FuIEZyYW5jaXNjbzET
MBEGA1UECgwKa2Fma2EtcGl4eTAeFw0yMDAyMTAwNTAxNTlaFw0yNTAyMDgwNTAx
NTlaME8xCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRYwFAYDVQQH
DA1TYW4gRnJhbmNpc2NvMRMwEQYDVQQKDAprYWZrYS1waXh5MIIBIjANBgkqhkiG
9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsDKjPb6BWCMracmh3J3mzcZ0FSSyhUFVcLPi
lROHnSZieDFess9mFBJoiIWSZsttn87dvQz3WlODkCyXmXUAM9sA0vD7LwrzkzX2
S5AsVuv1xahlvhMY6/V3nDDubqnpnPl9T+5b3BNlFiXLkrM2/5FkQ8OnO6bXUO/X
qnnBeUdjUvns0UV/s+1+i3rX9V3/jrH2nkMB+JUIOMsQgEySeiNClRSxzbA5BdC0
fVoW1WY0xmjo1EP0AcDesWn7GooUD4+wBB70WEZXrADMvNIW84Ht2CliuNS8ZASp
y0pvv5iBYMcXIIJgoAbCndnHWTnxwmA/Zq6gBhNp52hOJEYkmQIDAQABMA0GCSqG
SIb3DQEBCwUAA4IBAQBpbDNqwJqQUsBJmSrrR1jfhqqTGiA2D6yGDcuVlZDOerIO
X0LNw9xf0NKzAoBwbyXAvq82tiNO8Xa/6dnybzILwQzfiCj7YMnuNaiZO6LgAh0S
AMKuoqn1Gm8h2lSwWhofFK+pBWHgOpOiwESmVS45DkOV1WcEjjP5BO4zoKx4BujS
za/GlgM+Albwz5hVIOG88rbFCvKK2RphaDPZz97HcmZhNZfAHE2oaTajSxnftGlF
wfa4CLvNS7wosZ06uqUi/yZWRh4NCY0wa+atueAgYFwq1ppgbSfXmwI6TQVnR/+n
J/JpDeZMXwqIy1QU9AcWQ3kvg8QhtyNVWNdk4pFZ
-----END CERTIFICATE-----
30 changes: 30 additions & 0 deletions testdata/client.crt
@@ -0,0 +1,30 @@
-----BEGIN CERTIFICATE-----
MIIFGjCCAwICCQDhtWNzYgU8nTANBgkqhkiG9w0BAQsFADBPMQswCQYDVQQGEwJV
UzETMBEGA1UECAwKQ2FsaWZvcm5pYTEWMBQGA1UEBwwNU2FuIEZyYW5jaXNjbzET
MBEGA1UECgwKa2Fma2EtcGl4eTAeFw0yMDAyMTAwNTEzNTZaFw0zMDAyMDcwNTEz
NTZaME8xCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRYwFAYDVQQH
DA1TYW4gRnJhbmNpc2NvMRMwEQYDVQQKDAprYWZrYS1waXh5MIICIjANBgkqhkiG
9w0BAQEFAAOCAg8AMIICCgKCAgEAwyUn8GKUvuNw2gw2yHgwYp+xANojzjHbV8db
kwkk51Qwc4HntZwJLPnzmgtiWW9QTCgFfC251EZ2si8rxV098YX3b2q1heq/uIKB
5rK1oA9/0A80NXsodUrMgPIHO7JP+aBe43RYdGdojLwX3NLmtNVn4+BPEuW8jjin
dsr2DYsICA8ML7pUWvTo3xIKUcuA4mAY8Bewb3crxo0uqGHy9KcHAckvC/5p9s6/
O6y5I07b/WZbbn4pK/GLIdi5HM2YC0WhFvy6gq1EDOyaaCVqEaYfN+XWv6XX4KYn
c1xJXpVu3KLxlxdLVGNrRJcu1IOmtuPpj/wgvM7tlao9QVin8j+q0YE/D5GkjOus
edy5vBqBbI+Nx1DXM1aXxBs5Qmbt76tckJjqzRkA6mbrAM84XSem0EexPW2C0m5L
ZgWBQsC9tbzz9QDgEMpqwCMRCoAUn7h9AtlfiG8zoGwTTmRLmSat87xDFKQdpMWd
2iWJN2nmn0mZokNhEaC9TSfvZWJEViG99rzzNMJ3TC69pote2nvkkCfkJsOgPw1l
crfV9k5f3DdN3G9m+I4BoOsvQWWJlUNPJWH6Yr+XxJqIP2kbUpXSHjSPscu0SQqU
HKOnKMZo/4DijXi4V2jn65l3IaOX/hOIM3kIrrczJLvrYlVKrdgKKilVC2pSIuKx
Mcrx/+0CAwEAATANBgkqhkiG9w0BAQsFAAOCAgEAH604c4av9fGvu3VBeuUMdRcG
WXrI8S5XLkXTDYPP2s3NGPhGTgyoc86Dr6OCc4F2PmR2uu23Ylfk1TrU/WhingAL
R5U6eSYe8PF78jqA9w+isZzmDXbvXadY7mVWldWEMkVkHo0lGRTa7N0VQPEcATcU
JSzH6tFw4RGtG4JOpxjHlPDKV0daZmE2hH5W4CuiFrwqCWAJDzDtZultVnm7PpnP
wjie+xbovKBSjq8jhh8frU6Us0dLCalp9SZ1gmfjW2z4p2zdsKuQsWpedUu7xk7s
2SNhr62XoAEDrm2wGpe4b9o6zfe3ZIWJeK/jy01B+gA24Tx37IwxuUYpQdYREfQm
SWnOg4oDjR5CmNNuO4oMCu5oagDW9Zok4ep1650HAaUTBM2RlisECpykOf+BRnmo
LvpJKCqj6eBIrjW4WmE15AP4Nl43UCKTvJv1I6uHiOp2b7zynh1TojfKJg14nLVQ
IEWjwPYw5uKNtQYEBRMvFSqNXqP+mr8et6CtJ7oa5br5S9Xoy446KHiC1wNU2M1Q
TjNN0c6wUcK7Q1KSLbiajm02xqGErP+gRfK4tQzNdRPUo2mm+VWCtHW10b32i4rW
ZhvHA1Hlb+c2xmGGSdSJPnPMxLmbD4lECNs5XZWIon+czEGMQdbhIFKsRRLlLmUb
c5Ww8zWb3j691chmTSI=
-----END CERTIFICATE-----

0 comments on commit 91fc2e6

Please sign in to comment.