Skip to content
Browse files

Fix up SSL tests.

Tests shouldn't fail if python doesn't have an ssl
module. Other changes include:

- Document how to test all this (i.e. how to start the server for
  these tests to run)
- Don't do requirements testing in setUp since it's run before every
  method.
- Move the "no ssl module" test into it's own test class
  • Loading branch information...
1 parent 8536acb commit dd14d3544e9b416855947f93bc2b80ede0f13824 @behackett behackett committed Feb 19, 2013
Showing with 107 additions and 96 deletions.
  1. +107 −96 test/test_ssl.py
View
203 test/test_ssl.py
@@ -25,51 +25,96 @@
from pymongo import MongoClient, MongoReplicaSetClient
from pymongo.common import HAS_SSL
from pymongo.errors import ConfigurationError, ConnectionFailure
+from test import host, port, pair
-if HAS_SSL:
- import ssl
CERT_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'certificates')
CLIENT_PEM = os.path.join(CERT_PATH, 'client.pem')
CA_PEM = os.path.join(CERT_PATH, 'ca.pem')
+SIMPLE_SSL = False
+CERT_SSL = False
+SERVER_IS_RESOLVABLE = False
+
+# To fully test this start a mongod instance (built with SSL support) like so:
+# mongod --dbpath /path/to/data/directory --sslOnNormalPorts \
+# --sslPEMKeyFile /path/to/mongo/jstests/libs/server.pem \
+# --sslCAFile /path/to/mongo/jstests/libs/ca.pem \
+# --sslWeakCertificateValidation
+# Also, make sure you have 'server' as an alias for localhost in /etc/hosts
+#
+# Note: For all tests to pass with MongoReplicaSetConnection the replica
+# set configuration must use 'server' for the hostname of all hosts.
-
-def has_server_host_entry():
+def is_server_resolvable():
"""Returns True if 'server' is resolvable."""
socket_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(1)
try:
- socket.gethostbyname('server')
- has_server_host_entry = True
- except:
- has_server_host_entry = False
- socket.setdefaulttimeout(socket_timeout)
- return has_server_host_entry
+ try:
+ socket.gethostbyname('server')
+ return True
+ except socket.error:
+ return False
+ finally:
+ socket.setdefaulttimeout(socket_timeout)
+
+
+if HAS_SSL:
+ import ssl
+
+ # Check this all once instead of before every test method below.
+
+ # Is MongoDB configured for SSL?
+ try:
+ MongoClient(host, port, connectTimeoutMS=100, ssl=True)
+ SIMPLE_SSL = True
+ except ConnectionFailure:
+ pass
+
+ if SIMPLE_SSL:
+ # Is MongoDB configured with server.pem and ca.pem from
+ # mongodb jstests/lib?
+ try:
+ MongoClient(host, port, connectTimeoutMS=100, ssl=True,
+ ssl_certfile=CLIENT_PEM)
+ CERT_SSL = True
+ except ConnectionFailure:
+ pass
+
+ if CERT_SSL:
+ SERVER_IS_RESOLVABLE = is_server_resolvable()
+
+
+class TestNoSSLModule(unittest.TestCase):
+
+ def test_no_ssl_module(self):
+ # Test that ConfigurationError is raised if the ssl
+ # module isn't available.
+ if HAS_SSL:
+ raise SkipTest(
+ "The ssl module is available, can't test what happens "
+ "without it."
+ )
+
+ self.assertRaises(ConfigurationError,
+ MongoClient, ssl=True)
+ self.assertRaises(ConfigurationError,
+ MongoReplicaSetClient, ssl=True)
class TestSSL(unittest.TestCase):
def setUp(self):
+ if not HAS_SSL:
+ raise SkipTest("The ssl module is not available.")
+
if sys.version.startswith('3.0'):
raise SkipTest("Python 3.0.x has problems "
"with SSL and socket timeouts.")
- # MongoDB not configured for SSL?
- try:
- MongoClient(connectTimeoutMS=100, ssl=True)
- self.simple_ssl = True
- except ConnectionFailure:
- self.simple_ssl = False
-
- # MongoDB configured with server.pem, ca.pem and crl.pem from
- # mongodb jstests/lib
- try:
- MongoClient(connectTimeoutMS=100, ssl=True,
- ssl_certfile=CLIENT_PEM)
- self.cert_ssl = True
- except ConnectionFailure:
- self.cert_ssl = False
+ if not SIMPLE_SSL:
+ raise SkipTest("No simple mongod available over SSL")
def test_config_ssl(self):
"""Tests various ssl configurations"""
@@ -135,42 +180,22 @@ def test_config_ssl(self):
ssl_keyfile=CLIENT_PEM,
ssl_certfile=CLIENT_PEM)
- def test_no_ssl(self):
- # Tests what happens when ssl is off on the server but you try to
- # connect to mongodb with ssl=True
-
- if HAS_SSL:
- raise SkipTest(
- "The ssl module is available, can't test what happens "
- "without it."
- )
-
- self.assertRaises(ConfigurationError,
- MongoClient, ssl=True)
- self.assertRaises(ConfigurationError,
- MongoReplicaSetClient, ssl=True)
-
def test_simple_ssl(self):
# Expects the server to be running with ssl and with
# no --sslPEMKeyFile or with --sslWeakCertificateValidation
-
- if not HAS_SSL:
- raise SkipTest("The ssl module is not available.")
-
- if not self.simple_ssl:
- raise SkipTest("No simple mongod available over SSL")
-
- client = MongoClient(ssl=True)
+ client = MongoClient(host, port, ssl=True)
response = client.admin.command('ismaster')
if 'setName' in response:
- client = MongoReplicaSetClient(replicaSet=response['setName'],
+ client = MongoReplicaSetClient(pair,
+ replicaSet=response['setName'],
w=len(response['hosts']),
ssl=True)
db = client.pymongo_ssl_test
db.test.drop()
self.assertTrue(db.test.insert({'ssl': True}))
self.assertTrue(db.test.find_one()['ssl'])
+ client.drop_database('pymongo_ssl_test')
def test_cert_ssl(self):
# Expects the server to be running with the the server.pem, ca.pem
@@ -181,24 +206,22 @@ def test_cert_ssl(self):
# --sslCRLFile=jstests/libs/crl.pem
#
# Also requires an /etc/hosts entry where "server" is resolvable
-
- if not HAS_SSL:
- raise SkipTest("The ssl module is not available.")
-
- if not self.cert_ssl:
+ if not CERT_SSL:
raise SkipTest("No mongod available over SSL with certs")
- client = MongoClient(ssl=True, ssl_certfile=CLIENT_PEM)
+ client = MongoClient(host, port, ssl=True, ssl_certfile=CLIENT_PEM)
response = client.admin.command('ismaster')
if 'setName' in response:
- client = MongoReplicaSetClient(replicaSet=response['setName'],
+ client = MongoReplicaSetClient(pair,
+ replicaSet=response['setName'],
w=len(response['hosts']),
ssl=True, ssl_certfile=CLIENT_PEM)
db = client.pymongo_ssl_test
db.test.drop()
self.assertTrue(db.test.insert({'ssl': True}))
self.assertTrue(db.test.find_one()['ssl'])
+ client.drop_database('pymongo_ssl_test')
def test_cert_ssl_implicitly_set(self):
# Expects the server to be running with the the server.pem, ca.pem
@@ -209,24 +232,22 @@ def test_cert_ssl_implicitly_set(self):
# --sslCRLFile=jstests/libs/crl.pem
#
# Also requires an /etc/hosts entry where "server" is resolvable
-
- if not HAS_SSL:
- raise SkipTest("The ssl module is not available.")
-
- if not self.cert_ssl:
+ if not CERT_SSL:
raise SkipTest("No mongod available over SSL with certs")
- client = MongoClient(ssl_certfile=CLIENT_PEM)
+ client = MongoClient(host, port, ssl_certfile=CLIENT_PEM)
response = client.admin.command('ismaster')
if 'setName' in response:
- client = MongoReplicaSetClient(replicaSet=response['setName'],
+ client = MongoReplicaSetClient(pair,
+ replicaSet=response['setName'],
w=len(response['hosts']),
ssl_certfile=CLIENT_PEM)
db = client.pymongo_ssl_test
db.test.drop()
self.assertTrue(db.test.insert({'ssl': True}))
self.assertTrue(db.test.find_one()['ssl'])
+ client.drop_database('pymongo_ssl_test')
def test_cert_ssl_validation(self):
# Expects the server to be running with the the server.pem, ca.pem
@@ -237,15 +258,11 @@ def test_cert_ssl_validation(self):
# --sslCRLFile=jstests/libs/crl.pem
#
# Also requires an /etc/hosts entry where "server" is resolvable
-
- if not HAS_SSL:
- raise SkipTest("The ssl module is not available.")
-
- if not self.cert_ssl:
+ if not CERT_SSL:
raise SkipTest("No mongod available over SSL with certs")
- if not has_server_host_entry():
- raise SkipTest("No hosts entry for 'server' cannot validate "
+ if not SERVER_IS_RESOLVABLE:
+ raise SkipTest("No hosts entry for 'server'. Cannot validate "
"hostname in the certificate")
client = MongoClient('server',
@@ -267,6 +284,7 @@ def test_cert_ssl_validation(self):
db.test.drop()
self.assertTrue(db.test.insert({'ssl': True}))
self.assertTrue(db.test.find_one()['ssl'])
+ client.drop_database('pymongo_ssl_test')
def test_cert_ssl_validation_optional(self):
# Expects the server to be running with the the server.pem, ca.pem
@@ -277,15 +295,11 @@ def test_cert_ssl_validation_optional(self):
# --sslCRLFile=jstests/libs/crl.pem
#
# Also requires an /etc/hosts entry where "server" is resolvable
-
- if not HAS_SSL:
- raise SkipTest("The ssl module is not available.")
-
- if not self.cert_ssl:
+ if not CERT_SSL:
raise SkipTest("No mongod available over SSL with certs")
- if not has_server_host_entry():
- raise SkipTest("No hosts entry for 'server' cannot validate "
+ if not SERVER_IS_RESOLVABLE:
+ raise SkipTest("No hosts entry for 'server'. Cannot validate "
"hostname in the certificate")
client = MongoClient('server',
@@ -307,6 +321,7 @@ def test_cert_ssl_validation_optional(self):
db.test.drop()
self.assertTrue(db.test.insert({'ssl': True}))
self.assertTrue(db.test.find_one()['ssl'])
+ client.drop_database('pymongo_ssl_test')
def test_cert_ssl_validation_hostname_fail(self):
# Expects the server to be running with the the server.pem, ca.pem
@@ -315,36 +330,32 @@ def test_cert_ssl_validation_hostname_fail(self):
# --sslPEMKeyFile=jstests/libs/server.pem
# --sslCAFile=jstests/libs/ca.pem
# --sslCRLFile=jstests/libs/crl.pem
-
- if not HAS_SSL:
- raise SkipTest("The ssl module is not available.")
-
- if not self.cert_ssl:
+ if not CERT_SSL:
raise SkipTest("No mongod available over SSL with certs")
client = MongoClient(ssl=True, ssl_certfile=CLIENT_PEM)
response = client.admin.command('ismaster')
- singleServer = 'setName' not in response
+ single_server = 'setName' not in response
- if singleServer:
+ if single_server:
try:
- MongoClient('localhost',
- ssl=True,
- ssl_certfile=CLIENT_PEM,
- ssl_cert_reqs=ssl.CERT_REQUIRED,
- ssl_ca_certs=CA_PEM)
+ MongoClient(pair,
+ ssl=True,
+ ssl_certfile=CLIENT_PEM,
+ ssl_cert_reqs=ssl.CERT_REQUIRED,
+ ssl_ca_certs=CA_PEM)
self.fail("Invalid hostname should have failed")
except:
pass
else:
try:
- MongoReplicaSetClient('localhost',
- replicaSet=response['setName'],
- w=len(response['hosts']),
- ssl=True,
- ssl_certfile=CLIENT_PEM,
- ssl_cert_reqs=ssl.CERT_OPTIONAL,
- ssl_ca_certs=CA_PEM)
+ MongoReplicaSetClient(pair,
+ replicaSet=response['setName'],
+ w=len(response['hosts']),
+ ssl=True,
+ ssl_certfile=CLIENT_PEM,
+ ssl_cert_reqs=ssl.CERT_OPTIONAL,
+ ssl_ca_certs=CA_PEM)
self.fail("Invalid hostname should have failed")
except:
pass

0 comments on commit dd14d35

Please sign in to comment.
Something went wrong with that request. Please try again.