diff --git a/OpenSSL/test/test_ssl.py b/OpenSSL/test/test_ssl.py index bb1c9aedf..1f231c9c0 100644 --- a/OpenSSL/test/test_ssl.py +++ b/OpenSSL/test/test_ssl.py @@ -1199,11 +1199,11 @@ def verify_callback(*args): def test_add_extra_chain_cert(self): """ - :py:obj:`Context.add_extra_chain_cert` accepts an :py:obj:`X509` instance to add to - the certificate chain. + :py:obj:`Context.add_extra_chain_cert` accepts an :py:obj:`X509` + instance to add to the certificate chain. - See :py:obj:`_create_certificate_chain` for the details of the certificate - chain tested. + See :py:obj:`_create_certificate_chain` for the details of the + certificate chain tested. The chain is tested by starting a server with scert and connecting to it with a client which trusts cacert and requires verification to @@ -1214,15 +1214,17 @@ def test_add_extra_chain_cert(self): # Dump the CA certificate to a file because that's the only way to load # it as a trusted CA in the client context. - for cert, name in [(cacert, 'ca.pem'), (icert, 'i.pem'), (scert, 's.pem')]: - fObj = open(name, 'w') - fObj.write(dump_certificate(FILETYPE_PEM, cert).decode('ascii')) - fObj.close() - - for key, name in [(cakey, 'ca.key'), (ikey, 'i.key'), (skey, 's.key')]: - fObj = open(name, 'w') - fObj.write(dump_privatekey(FILETYPE_PEM, key).decode('ascii')) - fObj.close() + for cert, name in [(cacert, 'ca.pem'), + (icert, 'i.pem'), + (scert, 's.pem')]: + with open(join(self.tmpdir, name), 'w') as f: + f.write(dump_certificate(FILETYPE_PEM, cert).decode('ascii')) + + for key, name in [(cakey, 'ca.key'), + (ikey, 'i.key'), + (skey, 's.key')]: + with open(join(self.tmpdir, name), 'w') as f: + f.write(dump_privatekey(FILETYPE_PEM, key).decode('ascii')) # Create the server context serverContext = Context(TLSv1_METHOD) @@ -1235,7 +1237,7 @@ def test_add_extra_chain_cert(self): clientContext = Context(TLSv1_METHOD) clientContext.set_verify( VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb) - clientContext.load_verify_locations(b"ca.pem") + clientContext.load_verify_locations(join(self.tmpdir, "ca.pem")) # Try it out. self._handshake_test(serverContext, clientContext) diff --git a/OpenSSL/test/util.py b/OpenSSL/test/util.py index b8be91deb..78b4a3fc9 100644 --- a/OpenSSL/test/util.py +++ b/OpenSSL/test/util.py @@ -7,18 +7,20 @@ U{Twisted}. """ +import os import shutil +import sys import traceback -import os, os.path -from tempfile import mktemp + +from tempfile import mktemp, mkdtemp from unittest import TestCase -import sys from six import PY3 from OpenSSL._util import exception_from_error_queue from OpenSSL.crypto import Error + try: import memdbg except Exception: @@ -28,14 +30,16 @@ class _memdbg(object): heap = None from OpenSSL._util import ffi, lib, byte_string as b + # This is the UTF-8 encoding of the SNOWMAN unicode code point. NON_ASCII = b("\xe2\x98\x83").decode("utf-8") + class TestCase(TestCase): """ - :py:class:`TestCase` adds useful testing functionality beyond what is available - from the standard library :py:class:`unittest.TestCase`. + :py:class:`TestCase` adds useful testing functionality beyond what is + available from the standard library :py:class:`unittest.TestCase`. """ def run(self, result): run = super(TestCase, self).run @@ -157,24 +161,38 @@ def format_leak(p): (None, Exception(stack % (allocs_report,)), None)) + _tmpdir = None + + + @property + def tmpdir(self): + """ + On demand create a temporary directory. + """ + if self._tmpdir is not None: + return self._tmpdir + + self._tmpdir = mkdtemp(dir=".") + return self._tmpdir + + def tearDown(self): """ - Clean up any files or directories created using :py:meth:`TestCase.mktemp`. - Subclasses must invoke this method if they override it or the - cleanup will not occur. + Clean up any files or directories created using + :py:meth:`TestCase.mktemp`. Subclasses must invoke this method if they + override it or the cleanup will not occur. """ - if False and self._temporaryFiles is not None: - for temp in self._temporaryFiles: - if os.path.isdir(temp): - shutil.rmtree(temp) - elif os.path.exists(temp): - os.unlink(temp) + if self._tmpdir is not None: + shutil.rmtree(self._tmpdir) + try: exception_from_error_queue(Error) except Error: e = sys.exc_info()[1] if e.args != ([],): - self.fail("Left over errors in OpenSSL error queue: " + repr(e)) + self.fail( + "Left over errors in OpenSSL error queue: " + repr(e) + ) def assertIsInstance(self, instance, classOrTuple, message=None): @@ -295,16 +313,13 @@ def failUnlessRaises(self, exception, f, *args, **kwargs): assertRaises = failUnlessRaises - _temporaryFiles = None def mktemp(self): """ - Pathetic substitute for twisted.trial.unittest.TestCase.mktemp. + Return UTF-8-encoded bytes of a path to a tmp file. + + The file will be cleaned up after the test run. """ - if self._temporaryFiles is None: - self._temporaryFiles = [] - temp = b(mktemp(dir=".")) - self._temporaryFiles.append(temp) - return temp + return mktemp(dir=self.tmpdir).encode("utf-8") # Other stuff