Skip to content
This repository has been archived by the owner on Jul 21, 2021. It is now read-only.

Commit

Permalink
Merge 368e294 into 2cc03de
Browse files Browse the repository at this point in the history
  • Loading branch information
linalinn committed Dec 11, 2019
2 parents 2cc03de + 368e294 commit 7f01beb
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 56 deletions.
30 changes: 28 additions & 2 deletions Makefile
@@ -1,9 +1,12 @@
# make file to hold the logic of build and test setup
ZK_VERSION ?= 3.4.12
ZK_VERSION ?= 3.3.3

ZK = zookeeper-$(ZK_VERSION)
ZK_URL = "https://archive.apache.org/dist/zookeeper/$(ZK)/$(ZK).tar.gz"

tls_passwd = password
tls_dir = "/tmp/certs"

PACKAGES := $(shell go list ./... | grep -v examples)

.DEFAULT_GOAL := test
Expand All @@ -21,7 +24,8 @@ install-covertools:
go get golang.org/x/tools/cmd/cover

.PHONY: setup
setup: $(ZK) install-covertools
setup: certs $(ZK) install-covertools


.PHONY: lint
lint:
Expand All @@ -37,3 +41,25 @@ test: build
go test -timeout 500s -v -race -covermode atomic -coverprofile=profile.cov $(PACKAGES)
# ignore if we fail to publish coverage
-goveralls -coverprofile=profile.cov -service=travis-ci

.PHONY: certs
certs:
mkdir $(tls_dir)
keytool -keystore $(tls_dir)/zookeeper.server.keystore.jks -storepass $(tls_passwd) -alias localhost -validity 10 -genkey -keyalg RSA -dname "CN=localhost, OU=nope, O=nope, L=nope, S=nope, C=NO"
openssl req -passout pass:$(tls_passwd) -new -x509 -keyout $(tls_dir)/ca-key -out $(tls_dir)/ca-cert -days 10 -subj "/CN=localhost/OU=nope/O=nope/L=nope/C=NO"
keytool -keystore $(tls_dir)/zookeeper.client.truststore.jks -alias CARoot -import -file $(tls_dir)/ca-cert -trustcacerts -noprompt -storepass $(tls_passwd)
keytool -keystore $(tls_dir)/zookeeper.server.keystore.jks -alias localhost -certreq -file $(tls_dir)/cert-file -storepass $(tls_passwd)
openssl x509 -req -CA $(tls_dir)/ca-cert -CAkey $(tls_dir)/ca-key -in $(tls_dir)/cert-file -out $(tls_dir)/cert-signed -days 10 -CAcreateserial -passin pass:$(tls_passwd)
keytool -keystore $(tls_dir)/zookeeper.server.keystore.jks -alias CARoot -import -file $(tls_dir)/ca-cert -storepass $(tls_passwd) -trustcacerts -noprompt
keytool -keystore $(tls_dir)/zookeeper.server.keystore.jks -alias localhost -import -file $(tls_dir)/cert-signed -storepass $(tls_passwd)
keytool -keystore $(tls_dir)/zookeeper.client.keystore.jks -alias CLIENT -validity 10 -genkey -keyalg rsa -storepass $(tls_passwd) -dname "CN=localhost, OU=nope, O=nope, L=nope, S=nope, C=NO"
keytool -keystore $(tls_dir)/zookeeper.client.keystore.jks -alias CLIENT -certreq -file $(tls_dir)/client-cert-file -storepass $(tls_passwd)
openssl x509 -req -CA $(tls_dir)/ca-cert -CAkey $(tls_dir)/ca-key -in $(tls_dir)/client-cert-file -out $(tls_dir)/client-cert-signed -days 10 -CAcreateserial -passin pass:$(tls_passwd)
keytool -keystore $(tls_dir)/zookeeper.client.keystore.jks -alias CARoot -import -file $(tls_dir)/ca-cert -storepass $(tls_passwd) -trustcacerts -noprompt
keytool -keystore $(tls_dir)/zookeeper.client.keystore.jks -alias CLIENT -import -file $(tls_dir)/client-cert-signed -storepass $(tls_passwd) -trustcacerts -noprompt
keytool -keystore $(tls_dir)/zookeeper.server.truststore.jks -alias CARoot -import -file $(tls_dir)/ca-cert -storepass $(tls_passwd) -trustcacerts -noprompt
keytool -storepass $(tls_passwd) -srcstorepass $(tls_passwd) -importkeystore -srckeystore $(tls_dir)/zookeeper.server.truststore.jks -destkeystore $(tls_dir)/server.p12 -deststoretype PKCS12 -noprompt
openssl pkcs12 -in $(tls_dir)/server.p12 -nokeys -out $(tls_dir)/server.cer.pem -passin pass:$(tls_passwd)
keytool -importkeystore -srckeystore $(tls_dir)/zookeeper.server.keystore.jks -destkeystore $(tls_dir)/client.p12 -deststoretype PKCS12 -storepass $(tls_passwd) -srcstorepass $(tls_passwd)
openssl pkcs12 -in $(tls_dir)/client.p12 -nokeys -out $(tls_dir)/client.cer.pem -passin pass:$(tls_passwd)
openssl pkcs12 -in $(tls_dir)/client.p12 -nodes -nocerts -out $(tls_dir)/client.key.pem -passin pass:$(tls_passwd)
120 changes: 85 additions & 35 deletions zk/conn.go
Expand Up @@ -11,6 +11,7 @@ Possible watcher events:

import (
"crypto/rand"
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -108,7 +109,9 @@ type Conn struct {
logger Logger
logInfo bool // true if information messages are logged; false if only errors are logged

buf []byte
buf []byte
configTLS *tls.Config
enableTLS bool
}

// connOption represents a connection option.
Expand Down Expand Up @@ -166,30 +169,53 @@ func ConnectWithDialer(servers []string, sessionTimeout time.Duration, dialer Di
return Connect(servers, sessionTimeout, WithDialer(dialer))
}

func ConnectTLS(servers []string, sessionTimeout time.Duration, config *tls.Config, options ...connOption) (*Conn, <-chan Event, error) {
srvs, e := prepareServerArray(servers)
if e != nil {
return nil, nil, e
}

ec, conn, e := prepareConn(options, srvs, sessionTimeout, true, config)
if e != nil {
return nil, nil, e
}

go func() {
conn.loop()
conn.flushRequests(ErrClosing)
conn.invalidateWatches(ErrClosing)
close(conn.eventChan)
}()
return conn, ec, nil
}

// Connect establishes a new connection to a pool of zookeeper
// servers. The provided session timeout sets the amount of time for which
// a session is considered valid after losing connection to a server. Within
// the session timeout it's possible to reestablish a connection to a different
// server and keep the same session. This is means any ephemeral nodes and
// watches are maintained.
func Connect(servers []string, sessionTimeout time.Duration, options ...connOption) (*Conn, <-chan Event, error) {
if len(servers) == 0 {
return nil, nil, errors.New("zk: server list must not be empty")
srvs, e := prepareServerArray(servers)
if e != nil {
return nil, nil, e
}

srvs := make([]string, len(servers))

for i, addr := range servers {
if strings.Contains(addr, ":") {
srvs[i] = addr
} else {
srvs[i] = addr + ":" + strconv.Itoa(DefaultPort)
}
ec, conn, e := prepareConn(options, srvs, sessionTimeout, false, nil)
if e != nil {
return nil, nil, e
}

// Randomize the order of the servers to avoid creating hotspots
stringShuffle(srvs)
go func() {
conn.loop()
conn.flushRequests(ErrClosing)
conn.invalidateWatches(ErrClosing)
close(conn.eventChan)
}()
return conn, ec, nil
}

func prepareConn(options []connOption, srvs []string, sessionTimeout time.Duration, tlsEnabled bool, config *tls.Config) (chan Event, *Conn, error) {
ec := make(chan Event, eventChanSize)
conn := &Conn{
dialer: net.DialTimeout,
Expand All @@ -206,26 +232,35 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti
logger: DefaultLogger,
logInfo: true, // default is true for backwards compatability
buf: make([]byte, bufferSize),
enableTLS: tlsEnabled,
configTLS: config,
}

// Set provided options.
for _, option := range options {
option(conn)
}

if err := conn.hostProvider.Init(srvs); err != nil {
return nil, nil, err
}

conn.setTimeouts(int32(sessionTimeout / time.Millisecond))
return ec, conn, nil
}

go func() {
conn.loop()
conn.flushRequests(ErrClosing)
conn.invalidateWatches(ErrClosing)
close(conn.eventChan)
}()
return conn, ec, nil
func prepareServerArray(servers []string) ([]string, error) {
if len(servers) == 0 {
return nil, errors.New("zk: server list must not be empty")
}
srvs := make([]string, len(servers))
for i, addr := range servers {
if strings.Contains(addr, ":") {
srvs[i] = addr
} else {
srvs[i] = addr + ":" + strconv.Itoa(DefaultPort)
}
}
// Randomize the order of the servers to avoid creating hotspots
stringShuffle(srvs)
return srvs, nil
}

// WithDialer returns a connection option specifying a non-default Dialer.
Expand Down Expand Up @@ -377,17 +412,31 @@ func (c *Conn) connect() error {
}
}

zkConn, err := c.dialer("tcp", c.Server(), c.connectTimeout)
if err == nil {
c.conn = zkConn
c.setState(StateConnected)
if c.logInfo {
c.logger.Printf("Connected to %s", c.Server())
if c.enableTLS {
dialer := net.Dialer{Timeout: c.connectTimeout}
zkConn, zkConnErr := tls.DialWithDialer(&dialer, "tcp", c.Server(), c.configTLS)
if zkConnErr == nil {
c.conn = zkConn
c.setState(StateConnected)
if c.logInfo {
c.logger.Printf("Connected to %s", c.Server())
}
return nil
}
return nil
c.logger.Printf("Failed to connect to %s: %+v", c.Server(), zkConnErr)
} else {
zkConn, zkConnErr := c.dialer("tcp", c.Server(), c.connectTimeout)
if zkConnErr == nil {
c.conn = zkConn
c.setState(StateConnected)
if c.logInfo {
c.logger.Printf("Connected to %s", c.Server())
}
return nil
}
c.logger.Printf("Failed to connect to %s: %+v", c.Server(), zkConnErr)
}

c.logger.Printf("Failed to connect to %s: %+v", c.Server(), err)
}
}

Expand Down Expand Up @@ -867,7 +916,8 @@ func (c *Conn) recvLoop(conn net.Conn) error {
return err
}

if res.Xid == -1 {
switch {
case res.Xid == -1:
res := &watcherEvent{}
_, err := decodePacket(buf[16:blen], res)
if err != nil {
Expand Down Expand Up @@ -901,11 +951,11 @@ func (c *Conn) recvLoop(conn net.Conn) error {
}
}
c.watchersLock.Unlock()
} else if res.Xid == -2 {
case res.Xid == -2:
// Ping response. Ignore.
} else if res.Xid < 0 {
case res.Xid < 0:
c.logger.Printf("Xid < 0 (%d) but not ping or watcher event", res.Xid)
} else {
default:
if res.Zxid > 0 {
c.lastZxid = res.Zxid
}
Expand Down
70 changes: 70 additions & 0 deletions zk/conn_test.go
@@ -1,6 +1,7 @@
package zk

import (
"crypto/tls"
"io/ioutil"
"testing"
"time"
Expand Down Expand Up @@ -55,3 +56,72 @@ func TestRecurringReAuthHang(t *testing.T) {

<-conn.debugReauthDone
}

func TestStateChangesTLS(t *testing.T) {
t.Skip("Can't determent Zookeeper version")

config, err := newTLSConfig("/tmp/certs/client.cer.pem", "/tmp/certs/client.key.pem")
if err != nil {
panic(err)
}

ts, err := StartTestCluster(t, 1, logWriter{t: t, p: "[ZK] "}, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()

callbackChan := make(chan Event)
f := func(event Event) {
callbackChan <- event
}

zk, eventChan, err := ts.ConnectWithOptionsTLS(15*time.Second, config, WithEventCallback(f))
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}

verifyEventOrder := func(c <-chan Event, expectedStates []State, source string) {
for _, state := range expectedStates {
for {
event, ok := <-c
if !ok {
t.Fatalf("unexpected channel close for %s", source)
}

if event.Type != EventSession {
continue
}

if event.State != state {
t.Fatalf("mismatched state order from %s, expected %v, received %v", source, state, event.State)
}
break
}
}
}

states := []State{StateConnecting, StateConnected, StateHasSession}
verifyEventOrder(callbackChan, states, "callback")
verifyEventOrder(eventChan, states, "event channel")

zk.Close()
verifyEventOrder(callbackChan, []State{StateDisconnected}, "callback")
verifyEventOrder(eventChan, []State{StateDisconnected}, "event channel")
}

func newTLSConfig(clientCertFile, clientKeyFile string) (*tls.Config, error) {
tlsConfig := tls.Config{
InsecureSkipVerify: true,
}

// Load client cert
cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
if err != nil {
return nil, err
}
tlsConfig.Certificates = []tls.Certificate{cert}

tlsConfig.BuildNameToCertificate()
return &tlsConfig, nil
}

0 comments on commit 7f01beb

Please sign in to comment.