Skip to content

Commit

Permalink
Merge a1425e3 into ff2cb39
Browse files Browse the repository at this point in the history
  • Loading branch information
jantebeest committed Dec 18, 2019
2 parents ff2cb39 + a1425e3 commit 549cb72
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 35 deletions.
62 changes: 62 additions & 0 deletions core/internal/helpers/zookeeper.go
Expand Up @@ -11,6 +11,12 @@
package helpers

import (
"crypto/tls"
"crypto/x509"
"github.com/pkg/errors"
"github.com/spf13/viper"
"io/ioutil"
"net"
"time"

"github.com/linkedin/Burrow/core/protocol"
Expand Down Expand Up @@ -38,6 +44,62 @@ func ZookeeperConnect(servers []string, sessionTimeout time.Duration, logger *za
return &BurrowZookeeperClient{client: zkconn}, connEventChan, err
}

// ZookeeperConnectTLS establishes a new TLS connection to a pool of Zookeeper servers. The provided session timeout sets the
// amount of time for which a session is considered valid after losing connection to a server. Within the session
// timeout it's possible to reestablish a connection to a different server and keep the same session. This is means any
// ephemeral nodes and watches are maintained. The certificates are read from the configured zookeeper.tls profile.
func ZookeeperConnectTLS(servers []string, sessionTimeout time.Duration, logger *zap.Logger) (protocol.ZookeeperClient, <-chan zk.Event, error) {
tlsName := viper.GetString("zookeeper.tls")
caFile := viper.GetString("tls." + tlsName + ".cafile")
certFile := viper.GetString("tls." + tlsName + ".certfile")
keyFile := viper.GetString("tls." + tlsName + ".keyfile")

logger.Info("starting zookeeper (TLS)", zap.String("caFile", caFile), zap.String("certFile", certFile), zap.String("keyFile", keyFile))

dialer, err := newTLSDialer(servers[0], caFile, certFile, keyFile)
if err != nil {
return nil, nil, err
}

// We need a function to set the logger for the ZK connection
zkSetLogger := func(c *zk.Conn) {
c.SetLogger(zap.NewStdLog(logger))
}

zkconn, connEventChan, err := zk.Connect(servers, sessionTimeout, zk.WithDialer(dialer), zkSetLogger)
return &BurrowZookeeperClient{client: zkconn}, connEventChan, err
}

// newTLSDialer creates a dialer with TLS configured. It will install caFile as root CA and if both certFile and keyFile are
// set, it will add those as a certificate.
func newTLSDialer(addr string, caFile, certFile, keyFile string) (zk.Dialer, error) {
caCert, err := ioutil.ReadFile(caFile)
if err != nil {
return nil, errors.New("could not read caFile: " + err.Error())
}

caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return nil, errors.New("failed to add root certificate")
}

tlsConfig := &tls.Config{
RootCAs: caCertPool,
}

if len(certFile) > 0 && len(keyFile) > 0 {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, errors.New("cannot read TLS certificate or key file: " + err.Error())
}
tlsConfig.Certificates = []tls.Certificate{cert}
}

return func(string, string, time.Duration) (net.Conn, error) {
return tls.Dial("tcp", addr, tlsConfig)
}, nil
}

// Close shuts down the connection to the Zookeeper ensemble.
func (z *BurrowZookeeperClient) Close() {
z.client.Close()
Expand Down
5 changes: 4 additions & 1 deletion core/internal/zookeeper/coordinator.go
Expand Up @@ -49,7 +49,10 @@ type Coordinator struct {
func (zc *Coordinator) Configure() {
zc.Log.Info("configuring")

if zc.connectFunc == nil {
// if zookeeper.tls has been set, use the TLS connect function otherwise use default connect
if zc.connectFunc == nil && viper.IsSet("zookeeper.tls") {
zc.connectFunc = helpers.ZookeeperConnectTLS
} else if zc.connectFunc == nil {
zc.connectFunc = helpers.ZookeeperConnect
}

Expand Down
28 changes: 15 additions & 13 deletions go.mod
@@ -1,40 +1,42 @@
module github.com/linkedin/Burrow

require (
github.com/OneOfOne/xxhash v1.2.5
github.com/Shopify/sarama v1.24.0
github.com/OneOfOne/xxhash v1.2.6
github.com/Shopify/sarama v1.24.1
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/frankban/quicktest v1.5.0 // indirect
github.com/google/uuid v1.1.1 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/julienschmidt/httprouter v1.3.0
github.com/karrick/goswarm v1.10.0
github.com/klauspost/compress v1.8.6 // indirect
github.com/klauspost/cpuid v1.2.1 // indirect
github.com/magiconair/properties v1.8.1 // indirect
github.com/klauspost/compress v1.9.4 // indirect
github.com/pborman/uuid v1.2.0
github.com/pelletier/go-toml v1.5.0 // indirect
github.com/pelletier/go-toml v1.6.0 // indirect
github.com/pierrec/lz4 v2.3.0+incompatible // indirect
github.com/pkg/errors v0.8.1 // indirect
github.com/pkg/errors v0.8.1
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.4.0
github.com/spf13/viper v1.6.1
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.4.0
go.uber.org/multierr v1.2.0 // indirect
go.uber.org/zap v1.10.0
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 // indirect
golang.org/x/net v0.0.0-20191011234655-491137f69257 // indirect
golang.org/x/sys v0.0.0-20191010194322-b09406accb47 // indirect
go.uber.org/atomic v1.5.1 // indirect
go.uber.org/multierr v1.4.0 // indirect
go.uber.org/zap v1.13.0
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 // indirect
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 // indirect
golang.org/x/sys v0.0.0-20191210023423-ac6580df4449 // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/tools v0.0.0-20191217033636-bbbf87ae2631 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v2 v2.2.7 // indirect
)

go 1.13

0 comments on commit 549cb72

Please sign in to comment.