diff --git a/simp_le.py b/simp_le.py index fd29f72..7208192 100755 --- a/simp_le.py +++ b/simp_le.py @@ -82,94 +82,6 @@ class Error(Exception): """simp_le error.""" -class UnitTestCase(unittest.TestCase): - """simp_le unit test case.""" - - class AssertRaisesContext(object): - """Context for assert_raises.""" - # pylint: disable=too-few-public-methods - - def __init__(self): - self.error = None - - @contextlib.contextmanager - def assert_raises(self, exc): - """Assert raises context manager.""" - context = self.AssertRaisesContext() - try: - yield context - except exc as error: - context.error = error - else: - self.fail('Expected exception (%s) not raised' % exc) - - def assert_raises_regexp(self, exc, regexp, func, *args, **kwargs): - """Assert raises that tests exception message against regexp.""" - with self.assert_raises(exc) as context: - func(*args, **kwargs) - msg = str(context.error) - self.assertTrue(re.match(regexp, msg) is not None, - "Exception message (%s) doesn't match " - "regexp (%s)" % (msg, regexp)) - - def assert_raises_error(self, *args, **kwargs): - """Assert raises simp_le error with given message.""" - self.assert_raises_regexp(Error, *args, **kwargs) - - @staticmethod - def check_logs(level, pattern, func): - """Check whether func logs a message matching pattern. - - ``pattern`` is a regular expression to match the logs against. - ``func`` is the function to call. - ``level`` is the logging level to set during the function call. - - Returns True if there is a match, False otherwise. - """ - log_msgs = [] - - class TestHandler(logging.Handler): - """Log handler that saves logs in ``log_msgs``.""" - - def emit(self, record): - log_msgs.append(record.msg % record.args) - - handler = TestHandler(level=level) - logger.addHandler(handler) - - try: - func() - for msg in log_msgs: - if re.match(pattern, msg) is not None: - return True - return False - finally: - logger.removeHandler(handler) - - -_PEM_RE_LABELCHAR = r'[\x21-\x2c\x2e-\x7e]' -_PEM_RE = re.compile( - (r""" -^-----BEGIN\ ((?:%s(?:[- ]?%s)*)?)\s*-----$ -.*? -^-----END\ \1-----\s*""" % (_PEM_RE_LABELCHAR, _PEM_RE_LABELCHAR)).encode(), - re.DOTALL | re.MULTILINE | re.VERBOSE) -_PEMS_SEP = b'\n' - - -def split_pems(buf): - r"""Split buffer comprised of PEM encoded (RFC 7468). - - >>> x = b'\n-----BEGIN FOO BAR-----\nfoo\nbar\n-----END FOO BAR-----' - >>> len(list(split_pems(x * 3))) - 3 - >>> list(split_pems(b'')) - [] - """ - for match in _PEM_RE.finditer(buf): - yield match.group(0) - - def gen_pkey(bits): """Generate a private key. @@ -228,9 +140,9 @@ def get_le_tos_hash(le_uri): try: le_directory = requests.get(le_uri).json() except requests.ConnectionError: - raise Error("Connection to %s failed." % le_uri) + raise Error("Connection to {0} failed.".format(le_uri)) except ValueError: - raise Error("Failed to decode JSON from %s" % le_uri) + raise Error("Failed to decode JSON from {0}".format(le_uri)) le_tos_uri = le_directory['meta']['terms-of-service'] le_tos_hash = sha256_of_uri_contents(le_tos_uri) @@ -466,7 +378,7 @@ def save_to_file(self, data): persist_file.write(data) except OSError as error: logging.exception(error) - raise Error('Error when saving %s', self.path) + raise Error('Error when saving {0}'.format(self.path)) class JWKIOPlugin(IOPlugin): # pylint: disable=abstract-method @@ -477,29 +389,25 @@ def load_jwk(cls, data): """Load JWK.""" return jose.JWKRSA.json_loads(data) + @classmethod + def load_pem_jwk(cls, data): + """Load JWK encoded as PEM.""" + return jose.JWKRSA(key=serialization.load_pem_private_key( + data, password=None, backend=default_backend())) + @classmethod def dump_jwk(cls, jwk): """Dump JWK.""" return jwk.json_dumps() - -@IOPlugin.register(path='account_key.json') -class AccountKey(FileIOPlugin, JWKIOPlugin): - """Account key IO Plugin using JWS.""" - - # this is not a binary file - READ_MODE = 'r' - WRITE_MODE = 'w' - - def persisted(self): - return self.Data(account_key=True, key=False, cert=False, chain=False) - - def load_from_content(self, content): - return self.Data(account_key=self.load_jwk(content), key=None, - cert=None, chain=None) - - def save(self, data): - return self.save_to_file(self.dump_jwk(data.account_key)) + @classmethod + def dump_pem_jwk(cls, data): + """Dump JWK as PEM.""" + return data.key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ).strip() class OpenSSLIOPlugin(IOPlugin): # pylint: disable=abstract-method @@ -509,13 +417,34 @@ class OpenSSLIOPlugin(IOPlugin): # pylint: disable=abstract-method typ: One of `OpenSSL.crypto.FILETYPE_*`, used in loading/dumping. """ + _PEMS_SEP = b'\n' + def __init__(self, typ=OpenSSL.crypto.FILETYPE_PEM, **kwargs): self.typ = typ super(OpenSSLIOPlugin, self).__init__(**kwargs) + @staticmethod + def split_pems(data): + """Split buffer comprised of PEM encoded (RFC 7468).""" + pem_re_labelchar = r'[\x21-\x2c\x2e-\x7e]' + pem_re = re.compile( + (r""" + ^-----BEGIN\ ((?:%s(?:[- ]?%s)*)?)\s*-----$ + .*? + ^-----END\ \1-----\s*""" % (pem_re_labelchar, + pem_re_labelchar)).encode(), + re.DOTALL | re.MULTILINE | re.VERBOSE) + for match in pem_re.finditer(data): + yield match.group(0) + def load_key(self, data): """Load private key.""" - return ComparablePKey(OpenSSL.crypto.load_privatekey(self.typ, data)) + try: + key = OpenSSL.crypto.load_privatekey(self.typ, data) + except OpenSSL.crypto.Error: + raise Error("simp_le couldn't load a key from {0}; the " + "file might be empty or corrupt.".format(self.path)) + return ComparablePKey(key) def dump_key(self, data): """Dump private key.""" @@ -523,31 +452,162 @@ def dump_key(self, data): def load_cert(self, data): """Load certificate.""" - return jose.ComparableX509(OpenSSL.crypto.load_certificate( - self.typ, data)) + try: + cert = OpenSSL.crypto.load_certificate(self.typ, data) + except OpenSSL.crypto.Error: + raise Error("simp_le couldn't load a certificate from {0}; the " + "file might be empty or corrupt.".format(self.path)) + return jose.ComparableX509(cert) def dump_cert(self, data): """Dump certificate.""" return OpenSSL.crypto.dump_certificate(self.typ, data.wrapped).strip() -def load_pem_jwk(data): - """Load JWK encoded as PEM.""" - return jose.JWKRSA(key=serialization.load_pem_private_key( - data, password=None, backend=default_backend())) +@IOPlugin.register(path='account_key.json') +class AccountKey(FileIOPlugin, JWKIOPlugin): + """Account key IO Plugin using JWS.""" + + # this is not a binary file + READ_MODE = 'r' + WRITE_MODE = 'w' + + def persisted(self): + return self.Data(account_key=True, key=False, cert=False, chain=False) + + def load_from_content(self, content): + return self.Data( + account_key=self.load_jwk(content), + key=None, + cert=None, + chain=None, + ) + + def save(self, data): + key = self.dump_jwk(data.account_key) + return self.save_to_file(key) + + +@IOPlugin.register(path='key.der', typ=OpenSSL.crypto.FILETYPE_ASN1) +@IOPlugin.register(path='key.pem', typ=OpenSSL.crypto.FILETYPE_PEM) +class KeyFile(FileIOPlugin, OpenSSLIOPlugin): + """Private key file plugin.""" + + def persisted(self): + return self.Data(account_key=False, key=True, cert=False, chain=False) + + def load_from_content(self, content): + return self.Data( + account_key=None, + key=self.load_key(content), + cert=None, + chain=None, + ) + + def save(self, data): + key = self.dump_key(data.key) + return self.save_to_file(key) + + +@IOPlugin.register(path='cert.der', typ=OpenSSL.crypto.FILETYPE_ASN1) +@IOPlugin.register(path='cert.pem', typ=OpenSSL.crypto.FILETYPE_PEM) +class CertFile(FileIOPlugin, OpenSSLIOPlugin): + """Certificate file plugin.""" + + def persisted(self): + return self.Data(account_key=False, key=False, cert=True, chain=False) + + def load_from_content(self, content): + return self.Data( + account_key=None, + key=None, + cert=self.load_cert(content), + chain=None, + ) + + def save(self, data): + cert = self.dump_cert(data.cert) + return self.save_to_file(cert) + + +@IOPlugin.register(path='chain.pem', typ=OpenSSL.crypto.FILETYPE_PEM) +class ChainFile(FileIOPlugin, OpenSSLIOPlugin): + """Certificate chain plugin.""" + + def persisted(self): + return self.Data(account_key=False, key=False, cert=False, chain=True) + + def load_from_content(self, content): + pems = list(self.split_pems(content)) + if not pems: + raise Error("No PEM encoded message was found in {0}; " + "at least 1 was expected.".format(self.path)) + return self.Data( + account_key=None, + key=None, + cert=None, + chain=[self.load_cert(cert) for cert in pems[0:]], + ) + + def save(self, data): + pems = [self.dump_cert(cert) for cert in data.chain] + return self.save_to_file(self._PEMS_SEP.join(pems)) + + +@IOPlugin.register(path='fullchain.pem', typ=OpenSSL.crypto.FILETYPE_PEM) +class FullChainFile(FileIOPlugin, OpenSSLIOPlugin): + """Full chain file plugin.""" + + def persisted(self): + return self.Data(account_key=False, key=False, cert=True, chain=True) + + def load_from_content(self, content): + pems = list(self.split_pems(content)) + if len(pems) < 2: + raise Error("Not enough PEM encoded messages were found in {0}; " + "at least 2 were expected, found {1}." + .format(self.path, len(pems))) + return self.Data( + account_key=None, + key=None, + cert=self.load_cert(pems[0]), + chain=[self.load_cert(cert) for cert in pems[1:]], + ) + def save(self, data): + pems = [self.dump_cert(data.cert)] + pems.extend(self.dump_cert(cert) for cert in data.chain) + return self.save_to_file(self._PEMS_SEP.join(pems)) -def dump_pem_jwk(data): - """Dump JWK as PEM.""" - return data.key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption(), - ).strip() + +@IOPlugin.register(path='full.pem', typ=OpenSSL.crypto.FILETYPE_PEM) +class FullFile(FileIOPlugin, OpenSSLIOPlugin): + """Private key, certificate and chain plugin.""" + + def persisted(self): + return self.Data(account_key=False, key=True, cert=True, chain=True) + + def load_from_content(self, content): + pems = list(self.split_pems(content)) + if len(pems) < 3: + raise Error("Not enough PEM encoded messages were found in {0}; " + "at least 3 were expected, found {1}." + .format(self.path, len(pems))) + return self.Data( + account_key=None, + key=self.load_key(pems[0]), + cert=self.load_cert(pems[1]), + chain=[self.load_cert(cert) for cert in pems[2:]], + ) + + def save(self, data): + pems = [self.dump_key(data.key), self.dump_cert(data.cert)] + pems.extend(self.dump_cert(cert) for cert in data.chain) + return self.save_to_file(self._PEMS_SEP.join(pems)) @IOPlugin.register(path='external.sh', typ=OpenSSL.crypto.FILETYPE_PEM) -class ExternalIOPlugin(OpenSSLIOPlugin): +class ExternalIOPlugin(JWKIOPlugin, OpenSSLIOPlugin): """External IO Plugin. This plugin executes script that complies with the @@ -581,14 +641,14 @@ def get_output_or_fail(self, command): [self.script, command], stdin=subprocess.PIPE, stdout=subprocess.PIPE) except (OSError, subprocess.CalledProcessError) as error: - raise Error('Failed to execute external script: %s' % error) + raise Error('Failed to execute external script: {0}'.format(error)) stdout, stderr = proc.communicate() if stderr is not None: logger.error('STDERR: %s', stderr) if proc.wait(): - raise Error('External script exited with non-zero code: %d' % - proc.returncode) + raise Error('External script exited with non-zero code: {0}' + .format(proc.returncode)) # Do NOT log `stdout` as it might contain secret material (in # case key is persisted) @@ -606,12 +666,12 @@ def persisted(self): def load(self): """Call the external script to retrieve persisted data.""" - pems = list(split_pems(self.get_output_or_fail('load'))) + pems = list(self.split_pems(self.get_output_or_fail('load'))) if not pems: return self.EMPTY_DATA persisted = self.persisted() - account_key = load_pem_jwk( + account_key = self.load_pem_jwk( pems.pop(0)) if persisted.account_key else None key = self.load_key(pems.pop(0)) if persisted.key else None cert = self.load_cert(pems.pop(0)) if persisted.cert else None @@ -625,7 +685,7 @@ def save(self, data): persisted = self.persisted() output = [] if persisted.account_key: - output.append(dump_pem_jwk(data.account_key)) + output.append(self.dump_pem_jwk(data.account_key)) if persisted.key: output.append(self.dump_key(data.key)) if persisted.cert: @@ -642,14 +702,91 @@ def save(self, data): logger.exception(error) raise Error( 'There was a problem executing external IO plugin script') - stdout, stderr = proc.communicate(_PEMS_SEP.join(output)) + stdout, stderr = proc.communicate(self._PEMS_SEP.join(output)) if stdout is not None: logger.debug('STDOUT: %s', stdout) if stderr is not None: logger.error('STDERR: %s', stderr) if proc.wait(): - raise Error('External script exited with non-zero code: %d' % - proc.returncode) + raise Error('External script exited with non-zero code: {0}' + .format(proc.returncode)) + + +class UnitTestCase(unittest.TestCase): + """simp_le unit test case.""" + + class AssertRaisesContext(object): + """Context for assert_raises.""" + # pylint: disable=too-few-public-methods + + def __init__(self): + self.error = None + + @contextlib.contextmanager + def assert_raises(self, exc): + """Assert raises context manager.""" + context = self.AssertRaisesContext() + try: + yield context + except exc as error: + context.error = error + else: + self.fail('Expected exception (%s) not raised' % exc) + + def assert_raises_regexp(self, exc, regexp, func, *args, **kwargs): + """Assert raises that tests exception message against regexp.""" + with self.assert_raises(exc) as context: + func(*args, **kwargs) + msg = str(context.error) + self.assertTrue(re.match(regexp, msg) is not None, + "Exception message (%s) doesn't match " + "regexp (%s)" % (msg, regexp)) + + def assert_raises_error(self, *args, **kwargs): + """Assert raises simp_le error with given message.""" + self.assert_raises_regexp(Error, *args, **kwargs) + + @staticmethod + def check_logs(level, pattern, func): + """Check whether func logs a message matching pattern. + + ``pattern`` is a regular expression to match the logs against. + ``func`` is the function to call. + ``level`` is the logging level to set during the function call. + + Returns True if there is a match, False otherwise. + """ + log_msgs = [] + + class TestHandler(logging.Handler): + """Log handler that saves logs in ``log_msgs``.""" + + def emit(self, record): + log_msgs.append(record.msg % record.args) + + handler = TestHandler(level=level) + logger.addHandler(handler) + + try: + func() + for msg in log_msgs: + if re.match(pattern, msg) is not None: + return True + return False + finally: + logger.removeHandler(handler) + + +class SplitPemsTest(UnitTestCase): + """split_pems static method test.""" + # this is a test suite | pylint: disable=missing-docstring + + def test_split_pems(self): + pem = b'\n-----BEGIN FOO BAR-----\nfoo\nbar\n-----END FOO BAR-----' + result = len(list(OpenSSLIOPlugin.split_pems(pem * 3))) + self.assertEqual(result, 3) + result = list(OpenSSLIOPlugin.split_pems(b'')) + self.assertEqual(result, []) class PluginIOTestMixin(object): @@ -690,9 +827,17 @@ class FileIOPluginTestMixin(PluginIOTestMixin): """Common FileIO plugins tests.""" # this is a test suite | pylint: disable=missing-docstring + PEM = b'\n-----BEGIN FOO BAR-----\nfoo\nbar\n-----END FOO BAR-----' + def test_empty(self): self.assertEqual(IOPlugin.EMPTY_DATA, self.plugin.load()) + def test_load_empty_or_bad_content(self): + self.assert_raises_error('.*the file might be empty or corrupt.', + self.plugin.load_from_content, b'') + self.assert_raises_error('.*the file might be empty or corrupt.', + self.plugin.load_from_content, self.PEM) + def test_save_ignore_unpersisted(self): self.plugin.save(self.all_data) self.assertEqual(self.plugin.load(), IOPlugin.Data( @@ -700,45 +845,45 @@ def test_save_ignore_unpersisted(self): zip(self.plugin.persisted(), self.all_data)))) -class PortNumWarningTest(UnitTestCase): - """Tests relating to the port number warning.""" +class ChainFileIOPluginTestMixin(FileIOPluginTestMixin): + """Common Chain type FileIO plugins tests.""" + # this is a test suite | pylint: disable=missing-docstring - def _check_warn(self, should_log, path): - """test whether the supplied path triggers the port number warning. + def test_load_empty_or_bad_content(self): + self.assert_raises_error('.*PEM encoded message.*', + self.plugin.load_from_content, b'') + self.assert_raises_error('.*the file might be empty or corrupt.', + self.plugin.load_from_content, self.PEM * 3) - ``should_log`` is a boolean indicating whether or not we expect the - path to trigger a warning. - ``path`` is the webroot path to check. - If ``should_log`` is inconsistent with the behavior of - ``compute_roots`` given ``path``, the test fails. - """ - return self.assertEqual( - self.check_logs( - logging.WARN, - '.*looks like it is a port number.*', - lambda: compute_roots([ - Vhost('example.com', path), - ], 'webroot') - ), - should_log, - ) +class KeyFileTest(FileIOPluginTestMixin, UnitTestCase): + """Tests for KeyFile.""" + # this is a test suite | pylint: disable=missing-docstring + PLUGIN_CLS = KeyFile - def test_warn_port(self): - """A bare port number triggers the warning.""" - self._check_warn(True, '8000') - def test_warn_port_path(self): - """``port_no:path`` triggers the warning.""" - self._check_warn(True, '8000:/webroot') +class CertFileTest(FileIOPluginTestMixin, UnitTestCase): + """Tests for CertFile.""" + # this is a test suite | pylint: disable=missing-docstring + PLUGIN_CLS = CertFile - def test_no_warn_path(self): - """A bare path doesn't trigger the warning.""" - self._check_warn(False, '/my-web-root') - def test_no_warn_bigport(self): - """A number too big to be a port doesn't trigger the warning.""" - self._check_warn(False, '66000') +class ChainFileTest(ChainFileIOPluginTestMixin, UnitTestCase): + """Tests for ChainFile.""" + # this is a test suite | pylint: disable=missing-docstring + PLUGIN_CLS = ChainFile + + +class FullChainFileTest(ChainFileIOPluginTestMixin, UnitTestCase): + """Tests for FullChainFile.""" + # this is a test suite | pylint: disable=missing-docstring + PLUGIN_CLS = FullChainFile + + +class FullFileTest(ChainFileIOPluginTestMixin, UnitTestCase): + """Tests for FullFile.""" + # this is a test suite | pylint: disable=missing-docstring + PLUGIN_CLS = FullFile class ExternalIOPluginTest(PluginIOTestMixin, UnitTestCase): @@ -792,127 +937,45 @@ def test_it(self): self.assertEqual(self.all_data, self.plugin.load()) -@IOPlugin.register(path='chain.pem', typ=OpenSSL.crypto.FILETYPE_PEM) -class ChainFile(FileIOPlugin, OpenSSLIOPlugin): - """Certificate chain plugin.""" - - def persisted(self): - return self.Data(account_key=False, key=False, cert=False, chain=True) - - def load_from_content(self, content): - chain = [self.load_cert(cert_data) - for cert_data in split_pems(content)] - return self.Data(account_key=None, key=None, cert=None, chain=chain) - - def save(self, data): - return self.save_to_file(_PEMS_SEP.join( - self.dump_cert(chain_cert) for chain_cert in data.chain)) - - -class ChainFileTest(FileIOPluginTestMixin, UnitTestCase): - """Tests for ChainFile.""" - # this is a test suite | pylint: disable=missing-docstring - PLUGIN_CLS = ChainFile - - -@IOPlugin.register(path='fullchain.pem', typ=OpenSSL.crypto.FILETYPE_PEM) -class FullChainFile(ChainFile): - """Full chain file plugin.""" - - def persisted(self): - return self.Data(account_key=False, key=False, cert=True, chain=True) - - def load(self): - data = super(FullChainFile, self).load() - if data.chain is None: - cert, chain = None, None - else: - cert, chain = data.chain[0], data.chain[1:] - return self.Data(account_key=data.account_key, key=data.key, - cert=cert, chain=chain) - - def save(self, data): - return super(FullChainFile, self).save(self.Data( - account_key=data.account_key, key=data.key, - cert=None, chain=([data.cert] + data.chain))) - - -class FullChainFileTest(FileIOPluginTestMixin, UnitTestCase): - """Tests for FullChainFile.""" - # this is a test suite | pylint: disable=missing-docstring - PLUGIN_CLS = FullChainFile - - -@IOPlugin.register(path='key.der', typ=OpenSSL.crypto.FILETYPE_ASN1) -@IOPlugin.register(path='key.pem', typ=OpenSSL.crypto.FILETYPE_PEM) -class KeyFile(FileIOPlugin, OpenSSLIOPlugin): - """Private key file plugin.""" - - def persisted(self): - return self.Data(account_key=False, key=True, cert=False, chain=False) - - def load_from_content(self, content): - return self.Data(account_key=None, key=self.load_key(content), - cert=None, chain=None) - - def save(self, data): - return self.save_to_file(self.dump_key(data.key)) - - -class KeyFileTest(FileIOPluginTestMixin, UnitTestCase): - """Tests for KeyFile.""" - # this is a test suite | pylint: disable=missing-docstring - PLUGIN_CLS = KeyFile - - -@IOPlugin.register(path='cert.der', typ=OpenSSL.crypto.FILETYPE_ASN1) -@IOPlugin.register(path='cert.pem', typ=OpenSSL.crypto.FILETYPE_PEM) -class CertFile(FileIOPlugin, OpenSSLIOPlugin): - """Certificate file plugin.""" - - def persisted(self): - return self.Data(account_key=False, key=False, cert=True, chain=False) - - def load_from_content(self, content): - return self.Data(account_key=None, key=None, - cert=self.load_cert(content), chain=None) - - def save(self, data): - return self.save_to_file(self.dump_cert(data.cert)) - - -class CertFileTest(FileIOPluginTestMixin, UnitTestCase): - """Tests for CertFile.""" - # this is a test suite | pylint: disable=missing-docstring - PLUGIN_CLS = CertFile - +class PortNumWarningTest(UnitTestCase): + """Tests relating to the port number warning.""" -@IOPlugin.register(path='full.pem', typ=OpenSSL.crypto.FILETYPE_PEM) -class FullFile(FileIOPlugin, OpenSSLIOPlugin): - """Private key, certificate and chain plugin.""" + def _check_warn(self, should_log, path): + """test whether the supplied path triggers the port number warning. - def persisted(self): - return self.Data(account_key=False, key=True, cert=True, chain=True) + ``should_log`` is a boolean indicating whether or not we expect the + path to trigger a warning. + ``path`` is the webroot path to check. - def load_from_content(self, content): - pems = split_pems(content) - return self.Data( - account_key=None, - key=self.load_key(next(pems)), - cert=self.load_cert(next(pems)), - chain=[self.load_cert(cert) for cert in pems], + If ``should_log`` is inconsistent with the behavior of + ``compute_roots`` given ``path``, the test fails. + """ + return self.assertEqual( + self.check_logs( + logging.WARN, + '.*looks like it is a port number.*', + lambda: compute_roots([ + Vhost('example.com', path), + ], 'webroot') + ), + should_log, ) - def save(self, data): - pems = [self.dump_key(data.key), self.dump_cert(data.cert)] - pems.extend(self.dump_cert(cert) for cert in data.chain) - self.save_to_file(_PEMS_SEP.join(pems)) + def test_warn_port(self): + """A bare port number triggers the warning.""" + self._check_warn(True, '8000') + def test_warn_port_path(self): + """``port_no:path`` triggers the warning.""" + self._check_warn(True, '8000:/webroot') -class FullFileTest(FileIOPluginTestMixin, UnitTestCase): - """Tests for FullFile.""" - # this is a test suite | pylint: disable=missing-docstring - PLUGIN_CLS = FullFile + def test_no_warn_path(self): + """A bare path doesn't trigger the warning.""" + self._check_warn(False, '/my-web-root') + + def test_no_warn_bigport(self): + """A number too big to be a port doesn't trigger the warning.""" + self._check_warn(False, '66000') def create_parser(): @@ -1083,9 +1146,9 @@ def compute_roots(vhosts, default_root): empty_roots = dict((name, root) for name, root in six.iteritems(roots) if root is None) if empty_roots: - raise Error('Root for the following host(s) were not specified: %s. ' + raise Error('Root for the following host(s) were not specified: {0}. ' 'Try --default_root or use -d example.com:/var/www/html ' - 'syntax' % ', '.join(empty_roots)) + 'syntax'.format(', '.join(empty_roots))) return roots @@ -1136,7 +1199,7 @@ def sha256_of_uri_contents(uri, chunk_size=10): try: response = requests.get(uri, stream=True) except requests.ConnectionError: - raise Error("Connection to %s failed." % uri) + raise Error("Connection to {0} failed.".format(uri)) for chunk in response.iter_content(chunk_size): h.update(chunk) @@ -1263,7 +1326,7 @@ def check_plugins_persist_all(ioplugins): if not persist]) if not_persisted: raise Error('Selected IO plugins do not cover the following ' - 'components: %s.' % ', '.join(not_persisted)) + 'components: {0}.'.format(', '.join(not_persisted))) def load_existing_data(ioplugins): @@ -1287,7 +1350,7 @@ def merge(first, second, field): """ if first is not None and second is not None and first != second: raise Error('Some plugins returned conflicting data for ' - 'the "%s" component' % field) + 'the "{0}" component'.format(field)) return first or second all_existing = IOPlugin.EMPTY_DATA @@ -1380,7 +1443,7 @@ def registered_client(args, existing_account_key): tos_hash = sha256_of_uri_contents(regr.terms_of_service) logger.debug('TOS hash: %s', tos_hash) if args.tos_sha256 != tos_hash: - raise Error('TOS hash mismatch. Found: %s.' % tos_hash) + raise Error('TOS hash mismatch. Found: {0}.'.format(tos_hash)) client.agree_to_tos(regr) return client @@ -1531,8 +1594,8 @@ def main_with_exceptions(cli_args): if args.email is not None: match = re.match(r'.+@[A-Za-z0-9._-]+', args.email) if not match: - raise Error("The email address you provided (%s) does not appear" - "to be valid." % args.email) + raise Error("The email address you provided ({0}) does not appear" + "to be valid.".format(args.email)) existing_data = load_existing_data(args.ioplugins) if valid_existing_cert(existing_data.cert, args.vhosts, args.valid_min):