diff --git a/.travis.yml b/.travis.yml index 60a82c0d..799c8de0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -48,8 +48,12 @@ matrix: env: ZOOKEEPER_VERSION=3.3.6 TOX_VENV=py36 - python: '3.6' env: ZOOKEEPER_VERSION=3.4.13 TOX_VENV=py36 DEPLOY=true + - python: '3.6' + env: ZOOKEEPER_VERSION=3.4.13 TOX_VENV=py36-sasl - python: '3.6' env: ZOOKEEPER_VERSION=3.5.4-beta TOX_VENV=py36 + - python: '3.6' + env: ZOOKEEPER_VERSION=3.5.4-beta TOX_VENV=py36-sasl - python: pypy env: ZOOKEEPER_VERSION=3.3.6 TOX_VENV=pypy - python: pypy diff --git a/ensure-zookeeper-env.sh b/ensure-zookeeper-env.sh index c75c90f3..1a98c3b7 100755 --- a/ensure-zookeeper-env.sh +++ b/ensure-zookeeper-env.sh @@ -29,5 +29,4 @@ cd $HERE # Yield execution to venv command -$* - +exec $* diff --git a/kazoo/client.py b/kazoo/client.py index 0a500183..2258c021 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -70,7 +70,7 @@ LOST_STATES = (KeeperState.EXPIRED_SESSION, KeeperState.AUTH_FAILED, KeeperState.CLOSED) -ENVI_VERSION = re.compile('([\d\.]*).*', re.DOTALL) +ENVI_VERSION = re.compile(r'([\d\.]*).*', re.DOTALL) ENVI_VERSION_KEY = 'zookeeper.version' log = logging.getLogger(__name__) @@ -102,8 +102,8 @@ class KazooClient(object): """ def __init__(self, hosts='127.0.0.1:2181', timeout=10.0, client_id=None, handler=None, - default_acl=None, auth_data=None, read_only=None, - randomize_hosts=True, connection_retry=None, + default_acl=None, auth_data=None, sasl_options=None, + read_only=None, randomize_hosts=True, connection_retry=None, command_retry=None, logger=None, keyfile=None, keyfile_password=None, certfile=None, ca=None, use_ssl=False, verify_certs=True, **kwargs): @@ -123,6 +123,31 @@ def __init__(self, hosts='127.0.0.1:2181', A list of authentication credentials to use for the connection. Should be a list of (scheme, credential) tuples as :meth:`add_auth` takes. + :param sasl_options: + SASL options for the connection, if SASL support is to be used. + Should be a dict of SASL options passed to the underlying + `pure-sasl `_ library. + + For example using the DIGEST-MD5 mechnism: + + .. code-block:: python + + sasl_options = { + 'mechanism': 'DIGEST-MD5', + 'username': 'myusername', + 'password': 'mypassword' + } + + For GSSAPI, using the running process' ticket cache: + + .. code-block:: python + + sasl_options = { + 'mechanism': 'GSSAPI', + 'service': 'myzk', # optional + 'principal': 'client@EXAMPLE.COM' # optional + } + :param read_only: Allow connections to read only servers. :param randomize_hosts: By default randomize host selection. :param connection_retry: @@ -174,6 +199,9 @@ def __init__(self, hosts='127.0.0.1:2181', .. versionadded:: 1.2 The connection_retry, command_retry and logger options. + .. versionadded:: 2.7 + The sasl_options option. + """ self.logger = logger or log @@ -273,9 +301,39 @@ def __init__(self, hosts='127.0.0.1:2181', sleep_func=self.handler.sleep_func, **retry_keys) + # Managing legacy SASL options + for scheme, auth in self.auth_data: + if scheme != 'sasl': + continue + if sasl_options: + raise ConfigurationError( + 'Multiple SASL configurations provided' + ) + warnings.warn( + 'Passing SASL configuration as part of the auth_data is ' + 'deprecated, please use the sasl_options configuration ' + 'instead', DeprecationWarning, stacklevel=2 + ) + username, password = auth.split(':') + # Generate an equivalent SASL configuration + sasl_options = { + 'username': username, + 'password': password, + 'mechanism': 'DIGEST-MD5', + 'service': 'zookeeper', + 'principal': 'zk-sasl-md5', + } + # Cleanup + self.auth_data = set([ + (scheme, auth) + for scheme, auth in self.auth_data + if scheme != 'sasl' + ]) + self._conn_retry.interrupt = lambda: self._stopped.is_set() self._connection = ConnectionHandler( - self, self._conn_retry.copy(), logger=self.logger) + self, self._conn_retry.copy(), logger=self.logger, + sasl_options=sasl_options) # Every retry call should have its own copy of the retry helper # to avoid shared retry counts @@ -303,15 +361,6 @@ def _retry(*args, **kwargs): self.Semaphore = partial(Semaphore, self) self.ShallowParty = partial(ShallowParty, self) - # Managing SASL client - self.use_sasl = False - for scheme, auth in self.auth_data: - if scheme == "sasl": - self.use_sasl = True - # Could be used later for GSSAPI implementation - self.sasl_server_principal = "zk-sasl-md5" - break - # If we got any unhandled keywords, complain like Python would if kwargs: raise TypeError('__init__() got unexpected keyword arguments: %s' @@ -560,7 +609,7 @@ def _call(self, request, async_object): "Connection has been closed")) try: write_sock.send(b'\0') - except: + except: # NOQA async_object.set_exception(ConnectionClosedError( "Connection has been closed")) @@ -737,12 +786,8 @@ def add_auth(self, scheme, credential): """Send credentials to server. :param scheme: authentication scheme (default supported: - "digest", "sasl"). Note that "sasl" scheme is - requiring "pure-sasl" library to be - installed. + "digest"). :param credential: the credential -- value depends on scheme. - "digest": user:password - "sasl": user:password :returns: True if it was successful. :rtype: bool diff --git a/kazoo/exceptions.py b/kazoo/exceptions.py index eeefe495..69f959e2 100644 --- a/kazoo/exceptions.py +++ b/kazoo/exceptions.py @@ -43,6 +43,13 @@ class WriterNotClosedException(KazooException): """ +class SASLException(KazooException): + """Raised if SASL encountered a (local) error. + + .. versionadded:: 2.7.0 + """ + + def _invalid_error_code(): raise RuntimeError('Invalid error code') diff --git a/kazoo/protocol/connection.py b/kazoo/protocol/connection.py index 67d57899..b4320f24 100644 --- a/kazoo/protocol/connection.py +++ b/kazoo/protocol/connection.py @@ -9,12 +9,15 @@ import sys import time +import six + from kazoo.exceptions import ( AuthFailedError, ConnectionDropped, EXCEPTIONS, SessionExpiredError, - NoNodeError + NoNodeError, + SASLException, ) from kazoo.loggingsupport import BLATHER from kazoo.protocol.serialization import ( @@ -30,7 +33,7 @@ SASL, Transaction, Watch, - int_struct + int_struct, ) from kazoo.protocol.states import ( Callback, @@ -40,10 +43,12 @@ ) from kazoo.retry import ( ForceRetryError, - RetryFailedError + RetryFailedError, ) + try: - from puresasl.client import SASLClient + import puresasl + import puresasl.client PURESASL_AVAILABLE = True except ImportError: PURESASL_AVAILABLE = False @@ -139,7 +144,7 @@ class RWServerAvailable(Exception): class ConnectionHandler(object): """Zookeeper connection handler""" - def __init__(self, client, retry_sleeper, logger=None): + def __init__(self, client, retry_sleeper, logger=None, sasl_options=None): self.client = client self.handler = client.handler self.retry_sleeper = retry_sleeper @@ -159,10 +164,10 @@ def __init__(self, client, retry_sleeper, logger=None): self._xid = None self._rw_server = None self._ro_mode = False - self._ro = False self._connection_routine = None + self.sasl_options = sasl_options self.sasl_cli = None # This is instance specific to avoid odd thread bug issues in Python @@ -232,8 +237,8 @@ def _read(self, length, timeout): # have anything to select, but the wrapped object may still # have something to read as it has previously gotten enough # data from the underlying socket. - if (hasattr(self._socket, "pending") - and self._socket.pending() > 0): + if (hasattr(self._socket, "pending") and + self._socket.pending() > 0): pass else: s = self.handler.select([self._socket], [], [], timeout)[0] @@ -427,24 +432,6 @@ def _read_socket(self, read_timeout): async_object.set(True) elif header.xid == WATCH_XID: self._read_watch_event(buffer, offset) - elif self.sasl_cli and not self.sasl_cli.complete: - # SASL authentication is not yet finished, this can only - # be a SASL packet - self.logger.log(BLATHER, 'Received SASL') - try: - challenge, _ = SASL.deserialize(buffer, offset) - except Exception: - raise ConnectionDropped('error while SASL authentication.') - response = self.sasl_cli.process(challenge) - if response: - # authentication not yet finished, answering the challenge - self._send_sasl_request(challenge=response, - timeout=client._session_timeout) - else: - # authentication is ok, state is CONNECTED or CONNECTED_RO - # remove sensible information from the object - self._set_connected_ro_or_rw(client) - self.sasl_cli.dispose() else: self.logger.log(BLATHER, 'Reading for header %r', header) @@ -522,12 +509,13 @@ def _expand_client_hosts(self): host_ports = [] for host, port in self.client.hosts: try: - for rhost in socket.getaddrinfo(host.strip(), port, 0, 0, + host = host.strip() + for rhost in socket.getaddrinfo(host, port, 0, 0, socket.IPPROTO_TCP): - host_ports.append((rhost[4][0], rhost[4][1])) + host_ports.append((host, rhost[4][0], rhost[4][1])) except socket.gaierror as e: # Skip hosts that don't resolve - self.logger.warning("Cannot resolve %s: %s", host.strip(), e) + self.logger.warning("Cannot resolve %s: %s", host, e) pass if self.client.randomize_hosts: random.shuffle(host_ports) @@ -542,11 +530,11 @@ def _connect_loop(self, retry): if len(host_ports) == 0: return STOP_CONNECTING - for host, port in host_ports: + for host, hostip, port in host_ports: if self.client._stopped.is_set(): status = STOP_CONNECTING break - status = self._connect_attempt(host, port, retry) + status = self._connect_attempt(host, hostip, port, retry) if status is STOP_CONNECTING: break @@ -555,7 +543,7 @@ def _connect_loop(self, retry): else: raise ForceRetryError('Reconnecting') - def _connect_attempt(self, host, port, retry): + def _connect_attempt(self, host, hostip, port, retry): client = self.client KazooTimeoutError = self.handler.timeout_exception close_connection = False @@ -574,7 +562,7 @@ def _connect_attempt(self, host, port, retry): try: self._xid = 0 - read_timeout, connect_timeout = self._connect(host, port) + read_timeout, connect_timeout = self._connect(host, hostip, port) read_timeout = read_timeout / 1000.0 connect_timeout = connect_timeout / 1000.0 retry.reset() @@ -611,9 +599,9 @@ def _connect_attempt(self, host, port, retry): if client._state != KeeperState.CONNECTING: self.logger.warning("Transition to CONNECTING") client._session_callback(KeeperState.CONNECTING) - except AuthFailedError: + except AuthFailedError as err: retry.reset() - self.logger.warning('AUTH_FAILED closing') + self.logger.warning('AUTH_FAILED closing: %s', err) client._session_callback(KeeperState.AUTH_FAILED) return STOP_CONNECTING except SessionExpiredError: @@ -631,10 +619,10 @@ def _connect_attempt(self, host, port, retry): if self._socket is not None: self._socket.close() - def _connect(self, host, port): + def _connect(self, host, hostip, port): client = self.client - self.logger.info('Connecting to %s:%s, use_ssl: %r', - host, port, self.client.use_ssl) + self.logger.info('Connecting to %s(%s):%s, use_ssl: %r', + host, hostip, port, self.client.use_ssl) self.logger.log(BLATHER, ' Using session_id: %r session_passwd: %s', @@ -643,7 +631,7 @@ def _connect(self, host, port): with self._socket_error_handling(): self._socket = self.handler.create_connection( - address=(host, port), + address=(hostip, port), timeout=client._session_timeout / 1000.0, use_ssl=self.client.use_ssl, keyfile=self.client.keyfile, @@ -686,68 +674,112 @@ def _connect(self, host, port): read_timeout) if connect_result.read_only: - self._ro = True + client._session_callback(KeeperState.CONNECTED_RO) + self._ro_mode = iter(self._server_pinger()) + else: + client._session_callback(KeeperState.CONNECTED) + self._ro_mode = None + + if self.sasl_options is not None: + self._authenticate_with_sasl(host, connect_timeout / 1000.0) # Get a copy of the auth data before iterating, in case it is # changed. client_auth_data_copy = copy.copy(client.auth_data) - if client.use_sasl and self.sasl_cli is None: - if PURESASL_AVAILABLE: - for scheme, auth in client_auth_data_copy: - if scheme == 'sasl': - username, password = auth.split(":") - self.sasl_cli = SASLClient( - host=client.sasl_server_principal, - service='zookeeper', - mechanism='DIGEST-MD5', - username=username, - password=password - ) - break - - # As described in rfc - # https://tools.ietf.org/html/rfc2831#section-2.1 - # sending empty challenge - self._send_sasl_request(challenge=b'', - timeout=connect_timeout) - else: - self.logger.warn('Pure-sasl library is missing while sasl' - ' authentification is configured. Please' - ' install pure-sasl library to connect ' - 'using sasl. Now falling back ' - 'connecting WITHOUT any ' - 'authentification.') - client.use_sasl = False - self._set_connected_ro_or_rw(client) - else: - self._set_connected_ro_or_rw(client) - for scheme, auth in client_auth_data_copy: - if scheme == "digest": - ap = Auth(0, scheme, auth) - zxid = self._invoke( - connect_timeout / 1000.0, - ap, - xid=AUTH_XID - ) - if zxid: - client.last_zxid = zxid + for scheme, auth in client_auth_data_copy: + ap = Auth(0, scheme, auth) + zxid = self._invoke(connect_timeout / 1000.0, ap, xid=AUTH_XID) + if zxid: + client.last_zxid = zxid return read_timeout, connect_timeout - def _send_sasl_request(self, challenge, timeout): - """ Called when sending a SASL request, xid needs be to incremented """ - sasl_request = SASL(challenge) - self._xid = (self._xid % 2147483647) + 1 - xid = self._xid - self._submit(sasl_request, timeout / 1000.0, xid) - - def _set_connected_ro_or_rw(self, client): - """ Called to decide whether to set the KeeperState to CONNECTED_RO - or CONNECTED""" - if self._ro: - client._session_callback(KeeperState.CONNECTED_RO) - self._ro_mode = iter(self._server_pinger()) - else: - client._session_callback(KeeperState.CONNECTED) - self._ro_mode = None + def _authenticate_with_sasl(self, host, timeout): + """Establish a SASL authenticated connection to the server. + """ + if not PURESASL_AVAILABLE: + raise SASLException('Missing SASL support') + + if 'service' not in self.sasl_options: + self.sasl_options['service'] = 'zookeeper' + + # NOTE: Zookeeper hardcoded the domain for Digest authentication + # instead of using the hostname. See + # zookeeper/util/SecurityUtils.java#L74 and Server/Client + # initializations. + if self.sasl_options['mechanism'] == 'DIGEST-MD5': + host = 'zk-sasl-md5' + + sasl_cli = self.client.sasl_cli = puresasl.client.SASLClient( + host=host, + **self.sasl_options + ) + + # Inititalize the process with an empty challenge token + challenge = None + xid = 0 + + while True: + if sasl_cli.complete: + break + + try: + response = sasl_cli.process(challenge=challenge) + except puresasl.SASLError as err: + six.reraise( + SASLException, + SASLException('library error: %s' % err.message), + sys.exc_info()[2] + ) + except puresasl.SASLProtocolException as err: + six.reraise( + AuthFailedError, + AuthFailedError('protocol error: %s' % err.message), + sys.exc_info()[2] + ) + except Exception as err: + six.reraise( + AuthFailedError, + AuthFailedError('Unknown error: %s' % err), + sys.exc_info()[2] + ) + + if sasl_cli.complete and not response: + break + elif response is None: + response = b'' + + xid = (xid % 2147483647) + 1 + + request = SASL(response) + self._submit(request, timeout, xid) + + try: + header, buffer, offset = self._read_header(timeout) + except ConnectionDropped: + # Zookeeper simply drops connections with failed authentication + six.reraise( + AuthFailedError, + AuthFailedError('Connection dropped in SASL'), + sys.exc_info()[2] + ) + + if header.xid != xid: + raise RuntimeError('xids do not match, expected %r ' + 'received %r', xid, header.xid) + + if header.zxid > 0: + self.client.last_zxid = header.zxid + + if header.err: + callback_exception = EXCEPTIONS[header.err]() + self.logger.debug( + 'Received error(xid=%s) %r', xid, callback_exception) + raise callback_exception + + challenge, _ = SASL.deserialize(buffer, offset) + + # If we made it here, authentication is ok, and we are connected. + # Remove sensible information from the object. + sasl_cli.dispose() diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py index 75c6abe4..fa5c67a3 100644 --- a/kazoo/protocol/serialization.py +++ b/kazoo/protocol/serialization.py @@ -391,6 +391,7 @@ def deserialize(cls, bytes, offset): challenge, offset = read_buffer(bytes, offset) return challenge, offset + class Watch(namedtuple('Watch', 'type state path')): @classmethod def deserialize(cls, bytes, offset): diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py index e988fdb1..9f0f9d45 100644 --- a/kazoo/tests/test_client.py +++ b/kazoo/tests/test_client.py @@ -188,14 +188,54 @@ def test_connect_sasl_auth(self): version = self.client.server_version() if not version or version < (3, 4): raise SkipTest("Must use Zookeeper 3.4 or above") + try: + import puresasl # NOQA + except ImportError: + raise SkipTest('PureSASL not available.') username = "jaasuser" password = "jaas_password" - sasl_auth = "%s:%s" % (username, password) acl = make_acl('sasl', credential=username, all=True) + client = self._get_client( + sasl_options={'mechanism': 'DIGEST-MD5', + 'username': username, + 'password': password} + ) + client.start() + try: + client.create('/1', acl=(acl,)) + # give ZK a chance to copy data to other node + time.sleep(0.1) + self.assertRaises(NoAuthError, self.client.get, "/1") + finally: + client.delete('/1') + client.stop() + client.close() + + def test_connect_sasl_auth_leg(self): + from kazoo.security import make_acl + + if TRAVIS_ZK_VERSION: + version = TRAVIS_ZK_VERSION + else: + version = self.client.server_version() + if not version or version < (3, 4): + raise SkipTest("Must use Zookeeper 3.4 or above") + try: + import puresasl # NOQA + except ImportError: + raise SkipTest('PureSASL not available.') + + username = "jaasuser" + password = "jaas_password" + + sasl_auth = "%s:%s" % (username, password) + + acl = make_acl('sasl', credential=username, all=True) client = self._get_client(auth_data=[('sasl', sasl_auth)]) + client.start() try: client.create('/1', acl=(acl,)) @@ -252,8 +292,30 @@ def test_invalid_sasl_auth(self): version = self.client.server_version() if not version or version < (3, 4): raise SkipTest("Must use Zookeeper 3.4 or above") + try: + import puresasl # NOQA + except ImportError: + raise SkipTest('PureSASL not available.') + client = self._get_client( + sasl_options={'mechanism': 'DIGEST-MD5', + 'username': 'baduser', + 'password': 'badpassword'} + ) + self.assertRaises(AuthFailedError, client.start) + + def test_invalid_sasl_auth_leg(self): + if TRAVIS_ZK_VERSION: + version = TRAVIS_ZK_VERSION + else: + version = self.client.server_version() + if not version or version < (3, 4): + raise SkipTest("Must use Zookeeper 3.4 or above") + try: + import puresasl # NOQA + except ImportError: + raise SkipTest('PureSASL not available.') client = self._get_client(auth_data=[('sasl', 'baduser:badpassword')]) - self.assertRaises(ConnectionLoss, client.start) + self.assertRaises(AuthFailedError, client.start) def test_async_auth(self): client = self._get_client() diff --git a/requirements.txt b/requirements.txt index 2d5c0c68..f3854685 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ coverage==3.7.1 mock==1.0.1 nose==1.3.3 -pure-sasl==0.5.1 flake8==2.3.0 objgraph==3.4.0 diff --git a/requirements_sasl.txt b/requirements_sasl.txt new file mode 100644 index 00000000..3fb3416f --- /dev/null +++ b/requirements_sasl.txt @@ -0,0 +1 @@ +pure_sasl==0.5.1 diff --git a/setup.py b/setup.py index ee055d53..1d7c64b9 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,6 @@ 'mock', 'nose', 'flake8', - 'pure-sasl', 'objgraph', ] @@ -39,6 +38,7 @@ install_requires += [ 'gevent>=1.2', 'eventlet>=0.17.1', + 'pure-sasl', ] setup( @@ -77,6 +77,7 @@ tests_require=tests_require, extras_require={ 'test': tests_require, + 'sasl': ['pure-sasl'], }, long_description_content_type="text/markdown", ) diff --git a/tox.ini b/tox.ini index cd4ff27d..0d9783b5 100644 --- a/tox.ini +++ b/tox.ini @@ -4,11 +4,13 @@ skipsdist = True envlist = pep8, py27, - py27-gevent, - py27-eventlet, + py27-{gevent,eventlet,sasl}, py34, + py34-sasl, py35, + py35-sasl, py36, + py36-{sasl,docs}, pypy [testenv:pep8] @@ -21,17 +23,12 @@ setenv = VIRTUAL_ENV={envdir} ZOOKEEPER_VERSION={env:ZOOKEEPER_VERSION:} deps = -r{toxinidir}/requirements.txt - -r{toxinidir}/requirements_sphinx.txt + docs: -r{toxinidir}/requirements_sphinx.txt + gevent: -r{toxinidir}/requirements_gevent.txt + sasl: -r{toxinidir}/requirements_sasl.txt + eventlet: -r{toxinidir}/requirements_eventlet.txt commands = {toxinidir}/ensure-zookeeper-env.sh nosetests {posargs: -d -v --with-coverage kazoo.tests} -[testenv:py27-gevent] -deps = {[testenv]deps} - -r{toxinidir}/requirements_gevent.txt - -[testenv:py27-eventlet] -deps = {[testenv]deps} - -r{toxinidir}/requirements_eventlet.txt - [flake8] builtins = _ exclude = .venv,.tox,dist,doc,*egg,.git,build,tools,local,docs,zookeeper