Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add IOLoop auth tests and cleanup

  • Loading branch information...
commit 741b4a4458584301e78420d5780695ae77da3828 1 parent 9b193ec
@claws claws authored
View
2  .gitignore
@@ -16,3 +16,5 @@ docs/gh-pages
setup.cfg
MANIFEST
.tox
+examples/security/public_keys
+examples/security/private_keys
View
25 examples/security/generate_keys.py
@@ -6,6 +6,8 @@
generated by this script are used by the stonehouse and ironhouse examples.
In practice this would be done by hand or some out-of-band process.
+
+Author: Chris Laws
"""
import os
@@ -19,19 +21,13 @@ def generate_certificates():
public_keys_dir = os.path.join(base_dir, 'public_keys')
secret_keys_dir = os.path.join(base_dir, 'private_keys')
- if os.path.exists(keys_dir):
- shutil.rmtree(keys_dir)
- os.mkdir(keys_dir)
-
- if os.path.exists(public_keys_dir):
- shutil.rmtree(public_keys_dir)
- os.mkdir(public_keys_dir)
+ # Create directories for certificates, remove old content if necessary
+ for d in [keys_dir, public_keys_dir, secret_keys_dir]:
+ if os.path.exists(d):
+ shutil.rmtree(d)
+ os.mkdir(d)
- if os.path.exists(secret_keys_dir):
- shutil.rmtree(secret_keys_dir)
- os.mkdir(secret_keys_dir)
-
- # create new keys in .certs dir
+ # create new keys in certificates dir
server_public_file, server_secret_file = zmq.auth.create_certificates(keys_dir, "server")
client_public_file, client_secret_file = zmq.auth.create_certificates(keys_dir, "client")
@@ -48,4 +44,7 @@ def generate_certificates():
os.path.join(secret_keys_dir, '.'))
if __name__ == '__main__':
- generate_certificates()
+ if zmq.zmq_version_info() < (4,0):
+ raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version()))
+
+ generate_certificates()
View
6 examples/security/grasslands.py
@@ -5,9 +5,11 @@
All connections are accepted, there is no authentication, and no privacy.
-This is how ZeroMQ always worked until we build security into the wire
+This is how ZeroMQ always worked until we built security into the wire
protocol in early 2013. Internally, it uses a security mechanism called
-"NULL", but you don't even see that.
+"NULL".
+
+Author: Chris Laws
'''
import time
View
11 examples/security/ioloop-ironhouse-client.py
@@ -8,12 +8,13 @@
spyware on a machine to capture data before it's encrypted, or after it's
decrypted).
-This example demonstrates the IOLoopAuthenticator
+This example demonstrates using the IOLoopAuthenticator.
+
+Author: Chris Laws
'''
import datetime
import os
-import sys
import time
import zmq
import zmq.auth
@@ -59,13 +60,17 @@ def on_message(self, frames):
if __name__ == '__main__':
+ import logging
+ import sys
+
+ if zmq.zmq_version_info() < (4,0):
+ raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version()))
verbose = False
if '-v' in sys.argv:
verbose = True
if verbose:
- import logging
logging.basicConfig(format='%(asctime)-15s %(levelname)s %(message)s',
level=logging.DEBUG)
View
13 examples/security/ioloop-ironhouse-server.py
@@ -8,12 +8,13 @@
spyware on a machine to capture data before it's encrypted, or after it's
decrypted).
-This example demonstrates the IOLoopAuthenticator
+This example demonstrates using the IOLoopAuthenticator.
+
+Author: Chris Laws
'''
import datetime
import os
-import sys
import time
import zmq
import zmq.auth
@@ -61,13 +62,17 @@ def on_sent(self, msg, status):
if __name__ == '__main__':
+ import logging
+ import sys
+
+ if zmq.zmq_version_info() < (4,0):
+ raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version()))
verbose = False
if '-v' in sys.argv:
verbose = True
- if verbose:
- import logging
+ if verbose:
logging.basicConfig(format='%(asctime)-15s %(levelname)s %(message)s',
level=logging.DEBUG)
View
15 examples/security/ironhouse.py
@@ -7,16 +7,17 @@
attack we know about, except end-point attacks (where an attacker plants
spyware on a machine to capture data before it's encrypted, or after it's
decrypted).
+
+Author: Chris Laws
'''
import os
-import sys
import time
import zmq
import zmq.auth
-def run():
+def run(verbose=False):
''' Run Ironhouse example '''
# These direcotries are generated by the generate_keys script
@@ -33,7 +34,7 @@ def run():
# Start an authenticator for this context.
auth = zmq.auth.ThreadedAuthenticator(ctx)
- auth.start(verbose=True)
+ auth.start(verbose=verbose)
auth.allow('127.0.0.1')
# Tell authenticator to use the certificate in a directory
auth.configure_curve(domain='*', location=public_keys_dir)
@@ -79,14 +80,18 @@ def run():
auth.stop()
if __name__ == '__main__':
+ import logging
+ import sys
+
+ if zmq.zmq_version_info() < (4,0):
+ raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version()))
verbose = False
if '-v' in sys.argv:
verbose = True
if verbose:
- import logging
logging.basicConfig(format='%(asctime)-15s %(levelname)s %(message)s',
level=logging.DEBUG)
- run()
+ run(verbose)
View
15 examples/security/stonehouse.py
@@ -7,16 +7,17 @@
authentication. Stonehouse is the minimum you would use over public networks,
and assures clients that they are speaking to an authentic server, while
allowing any client to connect.
+
+Author: Chris Laws
'''
import os
-import sys
import time
import zmq
import zmq.auth
-def run():
+def run(verbose=False):
''' Run Stonehouse example '''
# These direcotries are generated by the generate_keys script
@@ -33,7 +34,7 @@ def run():
# Start an authenticator for this context.
auth = zmq.auth.ThreadedAuthenticator(ctx)
- auth.start(verbose=True)
+ auth.start(verbose=verbose)
auth.allow('127.0.0.1')
# Tell the authenticator how to handle CURVE requests
auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)
@@ -78,14 +79,18 @@ def run():
auth.stop()
if __name__ == '__main__':
+ import logging
+ import sys
+
+ if zmq.zmq_version_info() < (4,0):
+ raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version()))
verbose = False
if '-v' in sys.argv:
verbose = True
if verbose:
- import logging
logging.basicConfig(format='%(asctime)-15s %(levelname)s %(message)s',
level=logging.DEBUG)
- run()
+ run(verbose)
View
110 examples/security/strawhouse.py
@@ -7,6 +7,8 @@
uses the NULL mechanism, but we install an authentication hook that checks
the IP address against a whitelist or blacklist and allows or denies it
accordingly.
+
+Author: Chris Laws
'''
import time
@@ -14,75 +16,79 @@
import zmq.auth
-def run():
- ''' Run stawhouse client '''
- allow_test_pass = False
- deny_test_pass = False
+def run(verbose=False):
+ ''' Run stawhouse client '''
+
+ allow_test_pass = False
+ deny_test_pass = False
- ctx = zmq.Context().instance()
+ ctx = zmq.Context().instance()
- # Start an authenticator for this context.
- auth = zmq.auth.ThreadedAuthenticator(ctx)
- auth.start(verbose=True)
+ # Start an authenticator for this context.
+ auth = zmq.auth.ThreadedAuthenticator(ctx)
+ auth.start(verbose=verbose)
- # Part 1 - demonstrate allowing clients based on IP address
- auth.allow('127.0.0.1')
+ # Part 1 - demonstrate allowing clients based on IP address
+ auth.allow('127.0.0.1')
- server = ctx.socket(zmq.PUSH)
- server.zap_domain = 'global' # must come before bind
- server.bind('tcp://*:9000')
+ server = ctx.socket(zmq.PUSH)
+ server.zap_domain = 'global' # must come before bind
+ server.bind('tcp://*:9000')
- client_allow = ctx.socket(zmq.PULL)
- client_allow.connect('tcp://127.0.0.1:9000')
+ client_allow = ctx.socket(zmq.PULL)
+ client_allow.connect('tcp://127.0.0.1:9000')
- server.send("Hello")
-
- msg = client_allow.recv()
- if msg == "Hello":
- allow_test_pass = True
- else:
- allow_test_pass = False
+ server.send("Hello")
+
+ msg = client_allow.recv()
+ if msg == "Hello":
+ allow_test_pass = True
+ else:
+ allow_test_pass = False
- client_allow.close()
+ client_allow.close()
- # Part 2 - demonstrate denying clients based on IP address
- auth.deny('127.0.0.1')
+ # Part 2 - demonstrate denying clients based on IP address
+ auth.deny('127.0.0.1')
- server.send("Hello")
+ server.send("Hello")
- client_deny = ctx.socket(zmq.PULL)
- client_deny.connect('tcp://127.0.0.1:9000')
+ client_deny = ctx.socket(zmq.PULL)
+ client_deny.connect('tcp://127.0.0.1:9000')
- poller = zmq.Poller()
- poller.register(client_deny, zmq.POLLIN)
- socks = dict(poller.poll(50))
- if client_deny in socks and socks[client_deny] == zmq.POLLIN:
- msg = client_deny.recv()
- if msg == "Hello":
- deny_test_pass = False
- else:
- deny_test_pass = True
+ poller = zmq.Poller()
+ poller.register(client_deny, zmq.POLLIN)
+ socks = dict(poller.poll(50))
+ if client_deny in socks and socks[client_deny] == zmq.POLLIN:
+ msg = client_deny.recv()
+ if msg == "Hello":
+ deny_test_pass = False
+ else:
+ deny_test_pass = True
- client_deny.close()
+ client_deny.close()
- auth.stop() # stop auth thread
+ auth.stop() # stop auth thread
- if allow_test_pass and deny_test_pass:
- print "Strawhouse test OK"
- else:
- print "Strawhouse test FAIL"
+ if allow_test_pass and deny_test_pass:
+ print "Strawhouse test OK"
+ else:
+ print "Strawhouse test FAIL"
if __name__ == '__main__':
- import sys
+ import logging
+ import sys
+
+ if zmq.zmq_version_info() < (4,0):
+ raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version()))
- verbose = False
- if '-v' in sys.argv:
- verbose = True
+ verbose = False
+ if '-v' in sys.argv:
+ verbose = True
- if verbose:
- import logging
- logging.basicConfig(format='%(asctime)-15s %(levelname)s %(message)s',
- level=logging.DEBUG)
+ if verbose:
+ logging.basicConfig(format='%(asctime)-15s %(levelname)s %(message)s',
+ level=logging.DEBUG)
- run()
+ run(verbose)
View
123 examples/security/woodhouse.py
@@ -7,6 +7,8 @@
authentication). It's not really not secure, and anyone sniffing the
network (trivial with WiFi) can capture passwords and then login as if
they wanted.
+
+Author: Chris Laws
'''
import time
@@ -14,79 +16,84 @@
import zmq.auth
-def run():
- ''' Run woodhouse example '''
- valid_client_test_pass = False
- invalid_client_test_pass = False
+def run(verbose=False):
+ ''' Run woodhouse example '''
+
+ valid_client_test_pass = False
+ invalid_client_test_pass = False
- ctx = zmq.Context().instance()
+ ctx = zmq.Context().instance()
- # Start an authenticator for this context.
- auth = zmq.auth.ThreadedAuthenticator(ctx)
- auth.start(verbose=True)
- auth.allow('127.0.0.1')
- # Instruct authenticator to handle PLAIN requests
- auth.configure_plain(domain='*', passwords={'admin': 'secret'})
+ # Start an authenticator for this context.
+ auth = zmq.auth.ThreadedAuthenticator(ctx)
+ auth.start(verbose=verbose)
+ auth.allow('127.0.0.1')
+ # Instruct authenticator to handle PLAIN requests
+ auth.configure_plain(domain='*', passwords={'admin': 'secret'})
- server = ctx.socket(zmq.PUSH)
- server.plain_server = True # must come before bind
- server.bind('tcp://*:9000')
+ server = ctx.socket(zmq.PUSH)
+ server.plain_server = True # must come before bind
+ server.bind('tcp://*:9000')
- client = ctx.socket(zmq.PULL)
- client.plain_username = 'admin'
- client.plain_password = 'secret'
- client.connect('tcp://127.0.0.1:9000')
+ client = ctx.socket(zmq.PULL)
+ client.plain_username = 'admin'
+ client.plain_password = 'secret'
+ client.connect('tcp://127.0.0.1:9000')
- server.send("Hello")
+ server.send("Hello")
- poller = zmq.Poller()
- poller.register(client, zmq.POLLIN)
- socks = dict(poller.poll(50))
- if client in socks and socks[client] == zmq.POLLIN:
- msg = client.recv()
- if msg == "Hello":
- valid_client_test_pass = True
+ poller = zmq.Poller()
+ poller.register(client, zmq.POLLIN)
+ socks = dict(poller.poll(50))
+ if client in socks and socks[client] == zmq.POLLIN:
+ msg = client.recv()
+ if msg == "Hello":
+ valid_client_test_pass = True
- client.close()
+ client.close()
- # now use invalid credentials - expect no msg received
- client2 = ctx.socket(zmq.PULL)
- client2.plain_username = 'admin'
- client2.plain_password = 'bogus'
- client2.connect('tcp://127.0.0.1:9000')
+ # now use invalid credentials - expect no msg received
+ client2 = ctx.socket(zmq.PULL)
+ client2.plain_username = 'admin'
+ client2.plain_password = 'bogus'
+ client2.connect('tcp://127.0.0.1:9000')
- server.send("World")
+ server.send("World")
- poller = zmq.Poller()
- poller.register(client2, zmq.POLLIN)
- socks = dict(poller.poll(50))
- if client2 in socks and socks[client2] == zmq.POLLIN:
- msg = client.recv()
- if msg == "World":
- invalid_client_test_pass = False
- else:
- # no message is expected
- invalid_client_test_pass = True
+ poller = zmq.Poller()
+ poller.register(client2, zmq.POLLIN)
+ socks = dict(poller.poll(50))
+ if client2 in socks and socks[client2] == zmq.POLLIN:
+ msg = client.recv()
+ if msg == "World":
+ invalid_client_test_pass = False
+ else:
+ # no message is expected
+ invalid_client_test_pass = True
- # stop auth thread
- auth.stop()
+ # stop auth thread
+ auth.stop()
+
+ if valid_client_test_pass and invalid_client_test_pass:
+ print "Woodhouse test OK"
+ else:
+ print "Woodhouse test FAIL"
- if valid_client_test_pass and invalid_client_test_pass:
- print "Woodhouse test OK"
- else:
- print "Woodhouse test FAIL"
if __name__ == '__main__':
- import sys
+ import logging
+ import sys
+
+ if zmq.zmq_version_info() < (4,0):
+ raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version()))
- verbose = False
- if '-v' in sys.argv:
- verbose = True
+ verbose = False
+ if '-v' in sys.argv:
+ verbose = True
- if verbose:
- import logging
- logging.basicConfig(format='%(asctime)-15s %(levelname)s %(message)s',
- level=logging.DEBUG)
+ if verbose:
+ logging.basicConfig(format='%(asctime)-15s %(levelname)s %(message)s',
+ level=logging.DEBUG)
- run()
+ run(verbose)
View
10 zmq/auth.py
@@ -214,7 +214,6 @@ def configure_curve(self, domain='*', location=None):
'''
# If location is CURVE_ALLOW_ANY then allow all clients. Otherwise
# treat location as a directory that holds the certificates.
- logging.debug("configuring curve")
if location == CURVE_ALLOW_ANY:
self.allow_any = True
else:
@@ -583,14 +582,15 @@ def __del__(self):
class IOLoopAuthenticator(Authenticator):
''' A security authenticator that is run in an event loop '''
- def __init__(self, context, verbose=False):
- super(IOLoopAuthenticator, self).__init__(context, verbose)
+ def __init__(self, context, verbose=False, io_loop=None):
+ super(IOLoopAuthenticator, self).__init__(context, verbose=verbose)
self.zapstream = None
+ self.io_loop = io_loop or ioloop.IOLoop.current()
- def start(self):
+ def start(self, io_loop=None):
''' Run the ZAP authenticator in an event loop '''
super(IOLoopAuthenticator, self).start()
- self.zapstream = zmqstream.ZMQStream(self.zap_socket, ioloop.IOLoop.instance())
+ self.zapstream = zmqstream.ZMQStream(self.zap_socket, self.io_loop)
self.zapstream.on_recv(self.handle_zap_message)
def stop(self):
View
427 zmq/tests/test_auth.py
@@ -14,12 +14,15 @@
import os
import shutil
import tempfile
+from unittest import TestCase
import zmq
import zmq.auth
+from zmq.eventloop import ioloop, zmqstream
from zmq.tests import (BaseZMQTestCase, SkipTest)
class TestThreadedAuthentication(BaseZMQTestCase):
+ ''' Test authentication running in a thread '''
def setUp(self):
if zmq.zmq_version_info() < (4,0):
@@ -44,7 +47,7 @@ def can_connect(self, server, client):
return result
def test_null(self):
- """test NULL authentication """
+ """test threaded auth - NULL"""
# A default NULL connection should always succeed, and not
# go through our authentication infrastructure at all.
server = self.context.socket(zmq.PUSH)
@@ -64,7 +67,7 @@ def test_null(self):
server.close()
def test_blacklist_whitelist(self):
- """ test Blacklist and Whitelist authentication """
+ """test threaded auth - Blacklist and Whitelist"""
auth = zmq.auth.ThreadedAuthenticator(self.context)
auth.start()
@@ -93,7 +96,7 @@ def test_blacklist_whitelist(self):
auth.stop()
def test_plain(self):
- """test PLAIN authentication """
+ """test threaded auth - PLAIN"""
auth = zmq.auth.ThreadedAuthenticator(self.context)
auth.start()
@@ -139,12 +142,12 @@ def test_plain(self):
server.close()
def test_curve(self):
- """test CURVE authentication """
+ """test threaded auth - CURVE"""
auth = zmq.auth.ThreadedAuthenticator(self.context)
auth.start()
- # Create temporary CURVE keypairs for this test run. We create all keys in the
- # .certs directory and then move them into the appropriate private or public
+ # Create temporary CURVE keypairs for this test run. We create all keys in a
+ # temp directory and then move them into the appropriate private or public
# directory.
base_dir = tempfile.mkdtemp()
@@ -156,17 +159,14 @@ def test_curve(self):
os.mkdir(public_keys_dir)
os.mkdir(secret_keys_dir)
- # create new keys in .certs dir
server_public_file, server_secret_file = zmq.auth.create_certificates(keys_dir, "server")
client_public_file, client_secret_file = zmq.auth.create_certificates(keys_dir, "client")
- # move public keys to appropriate directory
for key_file in os.listdir(keys_dir):
if key_file.endswith(".key"):
shutil.move(os.path.join(keys_dir, key_file),
os.path.join(public_keys_dir, '.'))
- # move secret keys to appropriate directory
for key_file in os.listdir(keys_dir):
if key_file.endswith(".key_secret"):
shutil.move(os.path.join(keys_dir, key_file),
@@ -178,6 +178,8 @@ def test_curve(self):
server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
client_public, client_secret = zmq.auth.load_certificate(client_secret_file)
+ auth.allow('127.0.0.1')
+
#Try CURVE authentication - without configuring server, connection should fail
server = self.context.socket(zmq.PUSH)
server.curve_publickey = server_public
@@ -231,3 +233,410 @@ def test_curve(self):
server.close()
shutil.rmtree(base_dir)
+
+
+class TestIOLoopAuthentication(TestCase):
+ ''' Test authentication running in ioloop '''
+
+ def setUp(self):
+ if zmq.zmq_version_info() < (4,0):
+ raise SkipTest("security is new in libzmq 4.0")
+
+ self.test_result = True
+ self.io_loop = ioloop.IOLoop()
+ self.auth = None
+ self.context = zmq.Context()
+ self.server = self.context.socket(zmq.PUSH)
+ self.client = self.context.socket(zmq.PULL)
+ # Only need slow reconnect intervals for testing.
+ self.client.reconnect_ivl = 1000
+ self.pushstream = zmqstream.ZMQStream(self.server, self.io_loop)
+ self.pullstream = zmqstream.ZMQStream(self.client, self.io_loop)
+
+ def tearDown(self):
+ if self.auth:
+ self.auth.stop()
+ self.auth = None
+ self.io_loop.close()
+ self.context.destroy()
+
+ def attempt_connection(self):
+ """ Check if client can connect to server using tcp transport """
+ iface = 'tcp://127.0.0.1'
+ port = self.server.bind_to_random_port(iface)
+ self.client.connect("%s:%i" % (iface, port))
+
+ def send_msg(self):
+ ''' Send a message from server to a client '''
+ msg = [b"Hello World"]
+ self.server.send_multipart(msg)
+
+ def on_message_succeed(self, frames):
+ ''' A message was received, as expected. '''
+ if frames != [b"Hello World"]:
+ self.test_result = "Unexpected message received"
+ self.test_result = True
+ self.io_loop.stop()
+
+ def on_message_fail(self, frames):
+ ''' A message was received, unexpectedly. '''
+ self.test_result = 'Received messaged unexpectedly, security failed'
+ self.io_loop.stop()
+
+ def on_test_timeout_succeed(self):
+ ''' Test timer expired, indicates test success '''
+ self.test_result = True
+ self.io_loop.stop()
+
+ def on_test_timeout_fail(self):
+ ''' Test timer expired, indicates test failure '''
+ self.test_result = 'Test timed out'
+ if hasattr(self, 'loop') and self.io_loop is not None:
+ self.io_loop.stop()
+
+ def create_certs(self):
+ ''' Create CURVE certificates for a test '''
+
+ # Create temporary CURVE keypairs for this test run. We create all keys in a
+ # temp directory and then move them into the appropriate private or public
+ # directory.
+
+ base_dir = tempfile.mkdtemp()
+ keys_dir = os.path.join(base_dir, 'certificates')
+ public_keys_dir = os.path.join(base_dir, 'public_keys')
+ secret_keys_dir = os.path.join(base_dir, 'private_keys')
+
+ os.mkdir(keys_dir)
+ os.mkdir(public_keys_dir)
+ os.mkdir(secret_keys_dir)
+
+ server_public_file, server_secret_file = zmq.auth.create_certificates(keys_dir, "server")
+ client_public_file, client_secret_file = zmq.auth.create_certificates(keys_dir, "client")
+
+ for key_file in os.listdir(keys_dir):
+ if key_file.endswith(".key"):
+ shutil.move(os.path.join(keys_dir, key_file),
+ os.path.join(public_keys_dir, '.'))
+
+ for key_file in os.listdir(keys_dir):
+ if key_file.endswith(".key_secret"):
+ shutil.move(os.path.join(keys_dir, key_file),
+ os.path.join(secret_keys_dir, '.'))
+
+ return (base_dir, public_keys_dir, secret_keys_dir)
+
+ def remove_certs(self, base_dir):
+ ''' Remove certificates for a test '''
+ shutil.rmtree(base_dir)
+
+ def load_certs(self, secret_keys_dir):
+ ''' Return server and client certificate keys '''
+ server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
+ client_secret_file = os.path.join(secret_keys_dir, "client.key_secret")
+
+ server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
+ client_public, client_secret = zmq.auth.load_certificate(client_secret_file)
+
+ return server_public, server_secret, client_public, client_secret
+
+ def test_none(self):
+ ''' test ioloop auth - NONE'''
+ # A default NULL connection should always succeed, and not
+ # go through our authentication infrastructure at all.
+ self.pullstream.on_recv(self.on_message_succeed)
+
+ step1 = ioloop.DelayedCallback(self.attempt_connection, 100, self.io_loop)
+ step2 = ioloop.DelayedCallback(self.send_msg, 200, self.io_loop)
+ # Timeout the test so the test case can complete even if no message
+ # is received.
+ timeout = ioloop.DelayedCallback(self.on_test_timeout_fail, 500, self.io_loop)
+
+ step1.start()
+ step2.start()
+ timeout.start()
+
+ self.io_loop.start()
+
+ if not (self.test_result == True):
+ self.fail(self.test_result)
+
+ def test_null(self):
+ """test ioloop auth - NULL"""
+ # By setting a domain we switch on authentication for NULL sockets,
+ # though no policies are configured yet. The client connection
+ # should still be allowed.
+ self.server.zap_domain = 'global'
+ self.pullstream.on_recv(self.on_message_succeed)
+
+ step1 = ioloop.DelayedCallback(self.attempt_connection, 100, self.io_loop)
+ step2 = ioloop.DelayedCallback(self.send_msg, 200, self.io_loop)
+
+ # Timeout the test so the test case can complete even if no message
+ # is received.
+ timeout = ioloop.DelayedCallback(self.on_test_timeout_fail, 500, self.io_loop)
+
+ step1.start()
+ step2.start()
+ timeout.start()
+
+ self.io_loop.start()
+
+ if not (self.test_result == True):
+ self.fail(self.test_result)
+
+ def test_blacklist(self):
+ """ test ioloop auth - Blacklist"""
+
+ self.auth = zmq.auth.IOLoopAuthenticator(self.context, io_loop=self.io_loop)
+ self.auth.start()
+
+ # Blacklist 127.0.0.1, connection should fail
+ self.auth.deny('127.0.0.1')
+
+ self.server.zap_domain = 'global'
+ # The test should fail if a msg is received
+ self.pullstream.on_recv(self.on_message_fail)
+
+ step1 = ioloop.DelayedCallback(self.attempt_connection, 100, self.io_loop)
+ step2 = ioloop.DelayedCallback(self.send_msg, 200, self.io_loop)
+
+ # Timeout the test so the test case can complete even if no message
+ # is received.
+ timeout = ioloop.DelayedCallback(self.on_test_timeout_succeed, 500, self.io_loop)
+
+ step1.start()
+ step2.start()
+ timeout.start()
+
+ self.io_loop.start()
+
+ if not (self.test_result == True):
+ self.fail(self.test_result)
+
+
+ def test_whitelist(self):
+ """ test ioloop auth - Whitelist"""
+
+ self.auth = zmq.auth.IOLoopAuthenticator(self.context, io_loop=self.io_loop)
+ self.auth.start()
+
+ # Whitelist 127.0.0.1, which overrides the blacklist, connection should pass"
+ self.auth.allow('127.0.0.1')
+
+ self.server.setsockopt(zmq.ZAP_DOMAIN, 'global')
+ self.pullstream.on_recv(self.on_message_succeed)
+
+ step1 = ioloop.DelayedCallback(self.attempt_connection, 100, self.io_loop)
+ step2 = ioloop.DelayedCallback(self.send_msg, 200, self.io_loop)
+
+ # Timeout the test so the test case can complete even if no message
+ # is received.
+ timeout = ioloop.DelayedCallback(self.on_test_timeout_fail, 500, self.io_loop)
+
+ step1.start()
+ step2.start()
+ timeout.start()
+
+ self.io_loop.start()
+
+ if not (self.test_result == True):
+ self.fail(self.test_result)
+
+
+ def test_plain_unconfigured_server(self):
+ """test ioloop auth - PLAIN, unconfigured server"""
+ auth = zmq.auth.IOLoopAuthenticator(self.context, io_loop=self.io_loop)
+ auth.start()
+
+ self.client.plain_username = 'admin'
+ self.client.plain_password = 'Password'
+ self.pullstream.on_recv(self.on_message_fail)
+ # Try PLAIN authentication - without configuring server, connection should fail
+ self.server.plain_server = True
+
+ step1 = ioloop.DelayedCallback(self.attempt_connection, 100, self.io_loop)
+ step2 = ioloop.DelayedCallback(self.send_msg, 100, self.io_loop)
+
+ # Timeout the test so the test case can complete even if no message
+ # is received.
+ timeout = ioloop.DelayedCallback(self.on_test_timeout_succeed, 500, self.io_loop)
+
+ step1.start()
+ step2.start()
+ timeout.start()
+
+ self.io_loop.start()
+
+ if not (self.test_result == True):
+ self.fail(self.test_result)
+
+ def test_plain_configured_server(self):
+ """test ioloop auth - PLAIN, configured server"""
+ auth = zmq.auth.IOLoopAuthenticator(self.context, io_loop=self.io_loop)
+ auth.start()
+ auth.configure_plain(domain='*', passwords={'admin': 'Password'})
+
+ self.client.plain_username = 'admin'
+ self.client.plain_password = 'Password'
+ self.pullstream.on_recv(self.on_message_succeed)
+ # Try PLAIN authentication - with server configured, connection should pass
+ self.server.plain_server = True
+
+ step1 = ioloop.DelayedCallback(self.attempt_connection, 100, self.io_loop)
+ step2 = ioloop.DelayedCallback(self.send_msg, 200, self.io_loop)
+
+ # Timeout the test so the test case can complete even if no message
+ # is received.
+ timeout = ioloop.DelayedCallback(self.on_test_timeout_fail, 500, self.io_loop)
+
+ step1.start()
+ step2.start()
+ timeout.start()
+
+ self.io_loop.start()
+
+ if not (self.test_result == True):
+ self.fail(self.test_result)
+
+ def test_plain_bogus_credentials(self):
+ """test ioloop auth - PLAIN, bogus credentials"""
+ auth = zmq.auth.IOLoopAuthenticator(self.context, io_loop=self.io_loop)
+ auth.start()
+ auth.configure_plain(domain='*', passwords={'admin': 'Password'})
+
+ self.client.plain_username = 'admin'
+ self.client.plain_password = 'Bogus'
+ self.pullstream.on_recv(self.on_message_fail)
+ # Try PLAIN authentication - with server configured, connection should pass
+ self.server.plain_server = True
+
+ step1 = ioloop.DelayedCallback(self.attempt_connection, 100, self.io_loop)
+ step2 = ioloop.DelayedCallback(self.send_msg, 200, self.io_loop)
+
+ # Timeout the test so the test case can complete even if no message
+ # is received.
+ timeout = ioloop.DelayedCallback(self.on_test_timeout_succeed, 500, self.io_loop)
+
+ step1.start()
+ step2.start()
+ timeout.start()
+
+ self.io_loop.start()
+
+ if not (self.test_result == True):
+ self.fail(self.test_result)
+
+ def test_curve_unconfigured_server(self):
+ """test ioloop auth - CURVE, unconfigured server"""
+ base_dir, public_keys_dir, secret_keys_dir = self.create_certs()
+ certs = self.load_certs(secret_keys_dir)
+ server_public, server_secret, client_public, client_secret = certs
+
+ auth = zmq.auth.IOLoopAuthenticator(self.context, io_loop=self.io_loop)
+ auth.start()
+ auth.allow('127.0.0.1')
+
+ self.server.curve_publickey = server_public
+ self.server.curve_secretkey = server_secret
+ self.server.curve_server = True
+
+ self.client.curve_publickey = client_public
+ self.client.curve_secretkey = client_secret
+ self.client.curve_serverkey = server_public
+ self.pullstream.on_recv(self.on_message_fail)
+
+ step1 = ioloop.DelayedCallback(self.attempt_connection, 100, self.io_loop)
+ step2 = ioloop.DelayedCallback(self.send_msg, 200, self.io_loop)
+
+ # Timeout the test so the test case can complete even if no message
+ # is received.
+ timeout = ioloop.DelayedCallback(self.on_test_timeout_succeed, 500, self.io_loop)
+
+ step1.start()
+ step2.start()
+ timeout.start()
+
+ self.io_loop.start()
+
+ if not (self.test_result == True):
+ self.fail(self.test_result)
+
+ self.remove_certs(base_dir)
+
+ def test_curve_allow_any(self):
+ """test ioloop auth - CURVE, CURVE_ALLOW_ANY"""
+ base_dir, public_keys_dir, secret_keys_dir = self.create_certs()
+ certs = self.load_certs(secret_keys_dir)
+ server_public, server_secret, client_public, client_secret = certs
+
+ auth = zmq.auth.IOLoopAuthenticator(self.context, io_loop=self.io_loop)
+ auth.start()
+ auth.allow('127.0.0.1')
+ auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)
+
+ self.server.curve_publickey = server_public
+ self.server.curve_secretkey = server_secret
+ self.server.curve_server = True
+
+ self.client.curve_publickey = client_public
+ self.client.curve_secretkey = client_secret
+ self.client.curve_serverkey = server_public
+ self.pullstream.on_recv(self.on_message_succeed)
+
+ step1 = ioloop.DelayedCallback(self.attempt_connection, 100, self.io_loop)
+ step2 = ioloop.DelayedCallback(self.send_msg, 200, self.io_loop)
+
+ # Timeout the test so the test case can complete even if no message
+ # is received.
+ timeout = ioloop.DelayedCallback(self.on_test_timeout_fail, 500, self.io_loop)
+
+ step1.start()
+ step2.start()
+ timeout.start()
+
+ self.io_loop.start()
+
+ if not (self.test_result == True):
+ self.fail(self.test_result)
+
+ self.remove_certs(base_dir)
+
+ def test_curve_configured_server(self):
+ """test ioloop auth - CURVE, configured server"""
+
+ auth = zmq.auth.IOLoopAuthenticator(self.context, io_loop=self.io_loop)
+ auth.start()
+ auth.allow('127.0.0.1')
+
+ base_dir, public_keys_dir, secret_keys_dir = self.create_certs()
+ certs = self.load_certs(secret_keys_dir)
+ server_public, server_secret, client_public, client_secret = certs
+
+ auth.configure_curve(domain='*', location=public_keys_dir)
+
+ self.server.curve_publickey = server_public
+ self.server.curve_secretkey = server_secret
+ self.server.curve_server = True
+
+ self.client.curve_publickey = client_public
+ self.client.curve_secretkey = client_secret
+ self.client.curve_serverkey = server_public
+ self.pullstream.on_recv(self.on_message_succeed)
+
+ step1 = ioloop.DelayedCallback(self.attempt_connection, 100, self.io_loop)
+ step2 = ioloop.DelayedCallback(self.send_msg, 200, self.io_loop)
+
+ # Timeout the test so the test case can complete even if no message
+ # is received.
+ timeout = ioloop.DelayedCallback(self.on_test_timeout_fail, 500, self.io_loop)
+
+ step1.start()
+ step2.start()
+ timeout.start()
+
+ self.io_loop.start()
+
+ if not (self.test_result == True):
+ self.fail(self.test_result)
+
+ self.remove_certs(base_dir)
Please sign in to comment.
Something went wrong with that request. Please try again.