From cd49b3fa01136848c5e6bfafb4c241b9704f249d Mon Sep 17 00:00:00 2001 From: Charles-Henri de Boysson Date: Tue, 12 Feb 2019 15:03:43 -0500 Subject: [PATCH] feat(core): improve SASL interface (#546) Move SASL configuration out of auth_data into its own dictionary which exposes more SASL features (e.g. server service name, client principal...). Legacy syntax is still supported for backward compatibilty. Remove SASL from auth_data and place it between 'connection' and 'zookeeper protocol level authentication' to simplify connection logic and bring code in line with the protocol stack (SASL wraps Zookeeper, not the other way around). Consistent exception, `AuthFailedError`, raised during authentication failure between SASL and ZK authentication. New 'SASLException' exception raised in case of SASL intrisinc failures. Add support for GSSAPI (Kerberos). Example connection using Digest-MD5: client = KazooClient( sasl_options={'mechanism': 'DIGEST-MD5', 'username': 'myusername', 'password': 'mypassword'} ) Example connection using GSSAPI (with some optional settings): client = KazooClient( sasl_options={'mechanism': 'GSSAPI', 'service': 'myzk', # optional 'principal': 'clt@EXAMPLE.COM'} # optional ) --- .travis.yml | 4 + ensure-zookeeper-env.sh | 3 +- kazoo/client.py | 83 +++++++++--- kazoo/exceptions.py | 7 + kazoo/protocol/connection.py | 224 ++++++++++++++++++-------------- kazoo/protocol/serialization.py | 1 + kazoo/tests/test_client.py | 66 +++++++++- requirements.txt | 1 - requirements_sasl.txt | 1 + setup.py | 3 +- tox.ini | 19 ++- 11 files changed, 280 insertions(+), 132 deletions(-) create mode 100644 requirements_sasl.txt diff --git a/.travis.yml b/.travis.yml index 60a82c0d..799c8de0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -48,8 +48,12 @@ matrix: env: ZOOKEEPER_VERSION=3.3.6 TOX_VENV=py36 - python: '3.6' env: ZOOKEEPER_VERSION=3.4.13 TOX_VENV=py36 DEPLOY=true + - python: '3.6' + env: ZOOKEEPER_VERSION=3.4.13 TOX_VENV=py36-sasl - python: '3.6' env: ZOOKEEPER_VERSION=3.5.4-beta TOX_VENV=py36 + - python: '3.6' + env: ZOOKEEPER_VERSION=3.5.4-beta TOX_VENV=py36-sasl - python: pypy env: ZOOKEEPER_VERSION=3.3.6 TOX_VENV=pypy - python: pypy diff --git a/ensure-zookeeper-env.sh b/ensure-zookeeper-env.sh index c75c90f3..1a98c3b7 100755 --- a/ensure-zookeeper-env.sh +++ b/ensure-zookeeper-env.sh @@ -29,5 +29,4 @@ cd $HERE # Yield execution to venv command -$* - +exec $* diff --git a/kazoo/client.py b/kazoo/client.py index 0a500183..2258c021 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -70,7 +70,7 @@ LOST_STATES = (KeeperState.EXPIRED_SESSION, KeeperState.AUTH_FAILED, KeeperState.CLOSED) -ENVI_VERSION = re.compile('([\d\.]*).*', re.DOTALL) +ENVI_VERSION = re.compile(r'([\d\.]*).*', re.DOTALL) ENVI_VERSION_KEY = 'zookeeper.version' log = logging.getLogger(__name__) @@ -102,8 +102,8 @@ class KazooClient(object): """ def __init__(self, hosts='127.0.0.1:2181', timeout=10.0, client_id=None, handler=None, - default_acl=None, auth_data=None, read_only=None, - randomize_hosts=True, connection_retry=None, + default_acl=None, auth_data=None, sasl_options=None, + read_only=None, randomize_hosts=True, connection_retry=None, command_retry=None, logger=None, keyfile=None, keyfile_password=None, certfile=None, ca=None, use_ssl=False, verify_certs=True, **kwargs): @@ -123,6 +123,31 @@ def __init__(self, hosts='127.0.0.1:2181', A list of authentication credentials to use for the connection. Should be a list of (scheme, credential) tuples as :meth:`add_auth` takes. + :param sasl_options: + SASL options for the connection, if SASL support is to be used. + Should be a dict of SASL options passed to the underlying + `pure-sasl `_ library. + + For example using the DIGEST-MD5 mechnism: + + .. code-block:: python + + sasl_options = { + 'mechanism': 'DIGEST-MD5', + 'username': 'myusername', + 'password': 'mypassword' + } + + For GSSAPI, using the running process' ticket cache: + + .. code-block:: python + + sasl_options = { + 'mechanism': 'GSSAPI', + 'service': 'myzk', # optional + 'principal': 'client@EXAMPLE.COM' # optional + } + :param read_only: Allow connections to read only servers. :param randomize_hosts: By default randomize host selection. :param connection_retry: @@ -174,6 +199,9 @@ def __init__(self, hosts='127.0.0.1:2181', .. versionadded:: 1.2 The connection_retry, command_retry and logger options. + .. versionadded:: 2.7 + The sasl_options option. + """ self.logger = logger or log @@ -273,9 +301,39 @@ def __init__(self, hosts='127.0.0.1:2181', sleep_func=self.handler.sleep_func, **retry_keys) + # Managing legacy SASL options + for scheme, auth in self.auth_data: + if scheme != 'sasl': + continue + if sasl_options: + raise ConfigurationError( + 'Multiple SASL configurations provided' + ) + warnings.warn( + 'Passing SASL configuration as part of the auth_data is ' + 'deprecated, please use the sasl_options configuration ' + 'instead', DeprecationWarning, stacklevel=2 + ) + username, password = auth.split(':') + # Generate an equivalent SASL configuration + sasl_options = { + 'username': username, + 'password': password, + 'mechanism': 'DIGEST-MD5', + 'service': 'zookeeper', + 'principal': 'zk-sasl-md5', + } + # Cleanup + self.auth_data = set([ + (scheme, auth) + for scheme, auth in self.auth_data + if scheme != 'sasl' + ]) + self._conn_retry.interrupt = lambda: self._stopped.is_set() self._connection = ConnectionHandler( - self, self._conn_retry.copy(), logger=self.logger) + self, self._conn_retry.copy(), logger=self.logger, + sasl_options=sasl_options) # Every retry call should have its own copy of the retry helper # to avoid shared retry counts @@ -303,15 +361,6 @@ def _retry(*args, **kwargs): self.Semaphore = partial(Semaphore, self) self.ShallowParty = partial(ShallowParty, self) - # Managing SASL client - self.use_sasl = False - for scheme, auth in self.auth_data: - if scheme == "sasl": - self.use_sasl = True - # Could be used later for GSSAPI implementation - self.sasl_server_principal = "zk-sasl-md5" - break - # If we got any unhandled keywords, complain like Python would if kwargs: raise TypeError('__init__() got unexpected keyword arguments: %s' @@ -560,7 +609,7 @@ def _call(self, request, async_object): "Connection has been closed")) try: write_sock.send(b'\0') - except: + except: # NOQA async_object.set_exception(ConnectionClosedError( "Connection has been closed")) @@ -737,12 +786,8 @@ def add_auth(self, scheme, credential): """Send credentials to server. :param scheme: authentication scheme (default supported: - "digest", "sasl"). Note that "sasl" scheme is - requiring "pure-sasl" library to be - installed. + "digest"). :param credential: the credential -- value depends on scheme. - "digest": user:password - "sasl": user:password :returns: True if it was successful. :rtype: bool diff --git a/kazoo/exceptions.py b/kazoo/exceptions.py index eeefe495..69f959e2 100644 --- a/kazoo/exceptions.py +++ b/kazoo/exceptions.py @@ -43,6 +43,13 @@ class WriterNotClosedException(KazooException): """ +class SASLException(KazooException): + """Raised if SASL encountered a (local) error. + + .. versionadded:: 2.7.0 + """ + + def _invalid_error_code(): raise RuntimeError('Invalid error code') diff --git a/kazoo/protocol/connection.py b/kazoo/protocol/connection.py index 67d57899..b4320f24 100644 --- a/kazoo/protocol/connection.py +++ b/kazoo/protocol/connection.py @@ -9,12 +9,15 @@ import sys import time +import six + from kazoo.exceptions import ( AuthFailedError, ConnectionDropped, EXCEPTIONS, SessionExpiredError, - NoNodeError + NoNodeError, + SASLException, ) from kazoo.loggingsupport import BLATHER from kazoo.protocol.serialization import ( @@ -30,7 +33,7 @@ SASL, Transaction, Watch, - int_struct + int_struct, ) from kazoo.protocol.states import ( Callback, @@ -40,10 +43,12 @@ ) from kazoo.retry import ( ForceRetryError, - RetryFailedError + RetryFailedError, ) + try: - from puresasl.client import SASLClient + import puresasl + import puresasl.client PURESASL_AVAILABLE = True except ImportError: PURESASL_AVAILABLE = False @@ -139,7 +144,7 @@ class RWServerAvailable(Exception): class ConnectionHandler(object): """Zookeeper connection handler""" - def __init__(self, client, retry_sleeper, logger=None): + def __init__(self, client, retry_sleeper, logger=None, sasl_options=None): self.client = client self.handler = client.handler self.retry_sleeper = retry_sleeper @@ -159,10 +164,10 @@ def __init__(self, client, retry_sleeper, logger=None): self._xid = None self._rw_server = None self._ro_mode = False - self._ro = False self._connection_routine = None + self.sasl_options = sasl_options self.sasl_cli = None # This is instance specific to avoid odd thread bug issues in Python @@ -232,8 +237,8 @@ def _read(self, length, timeout): # have anything to select, but the wrapped object may still # have something to read as it has previously gotten enough # data from the underlying socket. - if (hasattr(self._socket, "pending") - and self._socket.pending() > 0): + if (hasattr(self._socket, "pending") and + self._socket.pending() > 0): pass else: s = self.handler.select([self._socket], [], [], timeout)[0] @@ -427,24 +432,6 @@ def _read_socket(self, read_timeout): async_object.set(True) elif header.xid == WATCH_XID: self._read_watch_event(buffer, offset) - elif self.sasl_cli and not self.sasl_cli.complete: - # SASL authentication is not yet finished, this can only - # be a SASL packet - self.logger.log(BLATHER, 'Received SASL') - try: - challenge, _ = SASL.deserialize(buffer, offset) - except Exception: - raise ConnectionDropped('error while SASL authentication.') - response = self.sasl_cli.process(challenge) - if response: - # authentication not yet finished, answering the challenge - self._send_sasl_request(challenge=response, - timeout=client._session_timeout) - else: - # authentication is ok, state is CONNECTED or CONNECTED_RO - # remove sensible information from the object - self._set_connected_ro_or_rw(client) - self.sasl_cli.dispose() else: self.logger.log(BLATHER, 'Reading for header %r', header) @@ -522,12 +509,13 @@ def _expand_client_hosts(self): host_ports = [] for host, port in self.client.hosts: try: - for rhost in socket.getaddrinfo(host.strip(), port, 0, 0, + host = host.strip() + for rhost in socket.getaddrinfo(host, port, 0, 0, socket.IPPROTO_TCP): - host_ports.append((rhost[4][0], rhost[4][1])) + host_ports.append((host, rhost[4][0], rhost[4][1])) except socket.gaierror as e: # Skip hosts that don't resolve - self.logger.warning("Cannot resolve %s: %s", host.strip(), e) + self.logger.warning("Cannot resolve %s: %s", host, e) pass if self.client.randomize_hosts: random.shuffle(host_ports) @@ -542,11 +530,11 @@ def _connect_loop(self, retry): if len(host_ports) == 0: return STOP_CONNECTING - for host, port in host_ports: + for host, hostip, port in host_ports: if self.client._stopped.is_set(): status = STOP_CONNECTING break - status = self._connect_attempt(host, port, retry) + status = self._connect_attempt(host, hostip, port, retry) if status is STOP_CONNECTING: break @@ -555,7 +543,7 @@ def _connect_loop(self, retry): else: raise ForceRetryError('Reconnecting') - def _connect_attempt(self, host, port, retry): + def _connect_attempt(self, host, hostip, port, retry): client = self.client KazooTimeoutError = self.handler.timeout_exception close_connection = False @@ -574,7 +562,7 @@ def _connect_attempt(self, host, port, retry): try: self._xid = 0 - read_timeout, connect_timeout = self._connect(host, port) + read_timeout, connect_timeout = self._connect(host, hostip, port) read_timeout = read_timeout / 1000.0 connect_timeout = connect_timeout / 1000.0 retry.reset() @@ -611,9 +599,9 @@ def _connect_attempt(self, host, port, retry): if client._state != KeeperState.CONNECTING: self.logger.warning("Transition to CONNECTING") client._session_callback(KeeperState.CONNECTING) - except AuthFailedError: + except AuthFailedError as err: retry.reset() - self.logger.warning('AUTH_FAILED closing') + self.logger.warning('AUTH_FAILED closing: %s', err) client._session_callback(KeeperState.AUTH_FAILED) return STOP_CONNECTING except SessionExpiredError: @@ -631,10 +619,10 @@ def _connect_attempt(self, host, port, retry): if self._socket is not None: self._socket.close() - def _connect(self, host, port): + def _connect(self, host, hostip, port): client = self.client - self.logger.info('Connecting to %s:%s, use_ssl: %r', - host, port, self.client.use_ssl) + self.logger.info('Connecting to %s(%s):%s, use_ssl: %r', + host, hostip, port, self.client.use_ssl) self.logger.log(BLATHER, ' Using session_id: %r session_passwd: %s', @@ -643,7 +631,7 @@ def _connect(self, host, port): with self._socket_error_handling(): self._socket = self.handler.create_connection( - address=(host, port), + address=(hostip, port), timeout=client._session_timeout / 1000.0, use_ssl=self.client.use_ssl, keyfile=self.client.keyfile, @@ -686,68 +674,112 @@ def _connect(self, host, port): read_timeout) if connect_result.read_only: - self._ro = True + client._session_callback(KeeperState.CONNECTED_RO) + self._ro_mode = iter(self._server_pinger()) + else: + client._session_callback(KeeperState.CONNECTED) + self._ro_mode = None + + if self.sasl_options is not None: + self._authenticate_with_sasl(host, connect_timeout / 1000.0) # Get a copy of the auth data before iterating, in case it is # changed. client_auth_data_copy = copy.copy(client.auth_data) - if client.use_sasl and self.sasl_cli is None: - if PURESASL_AVAILABLE: - for scheme, auth in client_auth_data_copy: - if scheme == 'sasl': - username, password = auth.split(":") - self.sasl_cli = SASLClient( - host=client.sasl_server_principal, - service='zookeeper', - mechanism='DIGEST-MD5', - username=username, - password=password - ) - break - - # As described in rfc - # https://tools.ietf.org/html/rfc2831#section-2.1 - # sending empty challenge - self._send_sasl_request(challenge=b'', - timeout=connect_timeout) - else: - self.logger.warn('Pure-sasl library is missing while sasl' - ' authentification is configured. Please' - ' install pure-sasl library to connect ' - 'using sasl. Now falling back ' - 'connecting WITHOUT any ' - 'authentification.') - client.use_sasl = False - self._set_connected_ro_or_rw(client) - else: - self._set_connected_ro_or_rw(client) - for scheme, auth in client_auth_data_copy: - if scheme == "digest": - ap = Auth(0, scheme, auth) - zxid = self._invoke( - connect_timeout / 1000.0, - ap, - xid=AUTH_XID - ) - if zxid: - client.last_zxid = zxid + for scheme, auth in client_auth_data_copy: + ap = Auth(0, scheme, auth) + zxid = self._invoke(connect_timeout / 1000.0, ap, xid=AUTH_XID) + if zxid: + client.last_zxid = zxid return read_timeout, connect_timeout - def _send_sasl_request(self, challenge, timeout): - """ Called when sending a SASL request, xid needs be to incremented """ - sasl_request = SASL(challenge) - self._xid = (self._xid % 2147483647) + 1 - xid = self._xid - self._submit(sasl_request, timeout / 1000.0, xid) - - def _set_connected_ro_or_rw(self, client): - """ Called to decide whether to set the KeeperState to CONNECTED_RO - or CONNECTED""" - if self._ro: - client._session_callback(KeeperState.CONNECTED_RO) - self._ro_mode = iter(self._server_pinger()) - else: - client._session_callback(KeeperState.CONNECTED) - self._ro_mode = None + def _authenticate_with_sasl(self, host, timeout): + """Establish a SASL authenticated connection to the server. + """ + if not PURESASL_AVAILABLE: + raise SASLException('Missing SASL support') + + if 'service' not in self.sasl_options: + self.sasl_options['service'] = 'zookeeper' + + # NOTE: Zookeeper hardcoded the domain for Digest authentication + # instead of using the hostname. See + # zookeeper/util/SecurityUtils.java#L74 and Server/Client + # initializations. + if self.sasl_options['mechanism'] == 'DIGEST-MD5': + host = 'zk-sasl-md5' + + sasl_cli = self.client.sasl_cli = puresasl.client.SASLClient( + host=host, + **self.sasl_options + ) + + # Inititalize the process with an empty challenge token + challenge = None + xid = 0 + + while True: + if sasl_cli.complete: + break + + try: + response = sasl_cli.process(challenge=challenge) + except puresasl.SASLError as err: + six.reraise( + SASLException, + SASLException('library error: %s' % err.message), + sys.exc_info()[2] + ) + except puresasl.SASLProtocolException as err: + six.reraise( + AuthFailedError, + AuthFailedError('protocol error: %s' % err.message), + sys.exc_info()[2] + ) + except Exception as err: + six.reraise( + AuthFailedError, + AuthFailedError('Unknown error: %s' % err), + sys.exc_info()[2] + ) + + if sasl_cli.complete and not response: + break + elif response is None: + response = b'' + + xid = (xid % 2147483647) + 1 + + request = SASL(response) + self._submit(request, timeout, xid) + + try: + header, buffer, offset = self._read_header(timeout) + except ConnectionDropped: + # Zookeeper simply drops connections with failed authentication + six.reraise( + AuthFailedError, + AuthFailedError('Connection dropped in SASL'), + sys.exc_info()[2] + ) + + if header.xid != xid: + raise RuntimeError('xids do not match, expected %r ' + 'received %r', xid, header.xid) + + if header.zxid > 0: + self.client.last_zxid = header.zxid + + if header.err: + callback_exception = EXCEPTIONS[header.err]() + self.logger.debug( + 'Received error(xid=%s) %r', xid, callback_exception) + raise callback_exception + + challenge, _ = SASL.deserialize(buffer, offset) + + # If we made it here, authentication is ok, and we are connected. + # Remove sensible information from the object. + sasl_cli.dispose() diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py index 75c6abe4..fa5c67a3 100644 --- a/kazoo/protocol/serialization.py +++ b/kazoo/protocol/serialization.py @@ -391,6 +391,7 @@ def deserialize(cls, bytes, offset): challenge, offset = read_buffer(bytes, offset) return challenge, offset + class Watch(namedtuple('Watch', 'type state path')): @classmethod def deserialize(cls, bytes, offset): diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py index e988fdb1..9f0f9d45 100644 --- a/kazoo/tests/test_client.py +++ b/kazoo/tests/test_client.py @@ -188,14 +188,54 @@ def test_connect_sasl_auth(self): version = self.client.server_version() if not version or version < (3, 4): raise SkipTest("Must use Zookeeper 3.4 or above") + try: + import puresasl # NOQA + except ImportError: + raise SkipTest('PureSASL not available.') username = "jaasuser" password = "jaas_password" - sasl_auth = "%s:%s" % (username, password) acl = make_acl('sasl', credential=username, all=True) + client = self._get_client( + sasl_options={'mechanism': 'DIGEST-MD5', + 'username': username, + 'password': password} + ) + client.start() + try: + client.create('/1', acl=(acl,)) + # give ZK a chance to copy data to other node + time.sleep(0.1) + self.assertRaises(NoAuthError, self.client.get, "/1") + finally: + client.delete('/1') + client.stop() + client.close() + + def test_connect_sasl_auth_leg(self): + from kazoo.security import make_acl + + if TRAVIS_ZK_VERSION: + version = TRAVIS_ZK_VERSION + else: + version = self.client.server_version() + if not version or version < (3, 4): + raise SkipTest("Must use Zookeeper 3.4 or above") + try: + import puresasl # NOQA + except ImportError: + raise SkipTest('PureSASL not available.') + + username = "jaasuser" + password = "jaas_password" + + sasl_auth = "%s:%s" % (username, password) + + acl = make_acl('sasl', credential=username, all=True) client = self._get_client(auth_data=[('sasl', sasl_auth)]) + client.start() try: client.create('/1', acl=(acl,)) @@ -252,8 +292,30 @@ def test_invalid_sasl_auth(self): version = self.client.server_version() if not version or version < (3, 4): raise SkipTest("Must use Zookeeper 3.4 or above") + try: + import puresasl # NOQA + except ImportError: + raise SkipTest('PureSASL not available.') + client = self._get_client( + sasl_options={'mechanism': 'DIGEST-MD5', + 'username': 'baduser', + 'password': 'badpassword'} + ) + self.assertRaises(AuthFailedError, client.start) + + def test_invalid_sasl_auth_leg(self): + if TRAVIS_ZK_VERSION: + version = TRAVIS_ZK_VERSION + else: + version = self.client.server_version() + if not version or version < (3, 4): + raise SkipTest("Must use Zookeeper 3.4 or above") + try: + import puresasl # NOQA + except ImportError: + raise SkipTest('PureSASL not available.') client = self._get_client(auth_data=[('sasl', 'baduser:badpassword')]) - self.assertRaises(ConnectionLoss, client.start) + self.assertRaises(AuthFailedError, client.start) def test_async_auth(self): client = self._get_client() diff --git a/requirements.txt b/requirements.txt index 2d5c0c68..f3854685 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ coverage==3.7.1 mock==1.0.1 nose==1.3.3 -pure-sasl==0.5.1 flake8==2.3.0 objgraph==3.4.0 diff --git a/requirements_sasl.txt b/requirements_sasl.txt new file mode 100644 index 00000000..3fb3416f --- /dev/null +++ b/requirements_sasl.txt @@ -0,0 +1 @@ +pure_sasl==0.5.1 diff --git a/setup.py b/setup.py index ee055d53..1d7c64b9 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,6 @@ 'mock', 'nose', 'flake8', - 'pure-sasl', 'objgraph', ] @@ -39,6 +38,7 @@ install_requires += [ 'gevent>=1.2', 'eventlet>=0.17.1', + 'pure-sasl', ] setup( @@ -77,6 +77,7 @@ tests_require=tests_require, extras_require={ 'test': tests_require, + 'sasl': ['pure-sasl'], }, long_description_content_type="text/markdown", ) diff --git a/tox.ini b/tox.ini index cd4ff27d..0d9783b5 100644 --- a/tox.ini +++ b/tox.ini @@ -4,11 +4,13 @@ skipsdist = True envlist = pep8, py27, - py27-gevent, - py27-eventlet, + py27-{gevent,eventlet,sasl}, py34, + py34-sasl, py35, + py35-sasl, py36, + py36-{sasl,docs}, pypy [testenv:pep8] @@ -21,17 +23,12 @@ setenv = VIRTUAL_ENV={envdir} ZOOKEEPER_VERSION={env:ZOOKEEPER_VERSION:} deps = -r{toxinidir}/requirements.txt - -r{toxinidir}/requirements_sphinx.txt + docs: -r{toxinidir}/requirements_sphinx.txt + gevent: -r{toxinidir}/requirements_gevent.txt + sasl: -r{toxinidir}/requirements_sasl.txt + eventlet: -r{toxinidir}/requirements_eventlet.txt commands = {toxinidir}/ensure-zookeeper-env.sh nosetests {posargs: -d -v --with-coverage kazoo.tests} -[testenv:py27-gevent] -deps = {[testenv]deps} - -r{toxinidir}/requirements_gevent.txt - -[testenv:py27-eventlet] -deps = {[testenv]deps} - -r{toxinidir}/requirements_eventlet.txt - [flake8] builtins = _ exclude = .venv,.tox,dist,doc,*egg,.git,build,tools,local,docs,zookeeper