Fetching contributors…
Cannot retrieve contributors at this time
242 lines (197 sloc) 8.53 KB
from __future__ import absolute_import, division, print_function
import errno
import os
import signal
import socket
from subprocess import Popen
import sys
import time
from tornado.netutil import (
BlockingResolver, OverrideResolver, ThreadedResolver, is_valid_ip, bind_sockets
from tornado.stack_context import ExceptionStackContext
from tornado.testing import AsyncTestCase, gen_test, bind_unused_port
from tornado.test.util import unittest, skipIfNoNetwork
from concurrent import futures
except ImportError:
futures = None
import pycares # type: ignore
except ImportError:
pycares = None
from tornado.platform.caresresolver import CaresResolver
import twisted # type: ignore
import twisted.names # type: ignore
except ImportError:
twisted = None
from tornado.platform.twisted import TwistedResolver
class _ResolverTestMixin(object):
def test_localhost(self):
self.resolver.resolve('localhost', 80, callback=self.stop)
result = self.wait()
self.assertIn((socket.AF_INET, ('', 80)), result)
def test_future_interface(self):
addrinfo = yield self.resolver.resolve('localhost', 80,
self.assertIn((socket.AF_INET, ('', 80)),
# It is impossible to quickly and consistently generate an error in name
# resolution, so test this case separately, using mocks as needed.
class _ResolverErrorTestMixin(object):
def test_bad_host(self):
def handler(exc_typ, exc_val, exc_tb):
return True # Halt propagation.
with ExceptionStackContext(handler):
self.resolver.resolve('an invalid domain', 80, callback=self.stop)
result = self.wait()
self.assertIsInstance(result, Exception)
def test_future_interface_bad_host(self):
with self.assertRaises(IOError):
yield self.resolver.resolve('an invalid domain', 80,
def _failing_getaddrinfo(*args):
"""Dummy implementation of getaddrinfo for use in mocks"""
raise socket.gaierror(errno.EIO, "mock: lookup failed")
class BlockingResolverTest(AsyncTestCase, _ResolverTestMixin):
def setUp(self):
super(BlockingResolverTest, self).setUp()
self.resolver = BlockingResolver()
# getaddrinfo-based tests need mocking to reliably generate errors;
# some configurations are slow to produce errors and take longer than
# our default timeout.
class BlockingResolverErrorTest(AsyncTestCase, _ResolverErrorTestMixin):
def setUp(self):
super(BlockingResolverErrorTest, self).setUp()
self.resolver = BlockingResolver()
self.real_getaddrinfo = socket.getaddrinfo
socket.getaddrinfo = _failing_getaddrinfo
def tearDown(self):
socket.getaddrinfo = self.real_getaddrinfo
super(BlockingResolverErrorTest, self).tearDown()
class OverrideResolverTest(AsyncTestCase, _ResolverTestMixin):
def setUp(self):
super(OverrideResolverTest, self).setUp()
mapping = {
('', 80): ('', 80),
('', 80, socket.AF_INET): ('', 80),
('', 80, socket.AF_INET6): ('2a02:6b8:7c:40c:c51e:495f:e23a:3', 80)
self.resolver = OverrideResolver(BlockingResolver(), mapping)
def test_resolve_multiaddr(self):
self.resolver.resolve('', 80, socket.AF_INET, callback=self.stop)
result = self.wait()
self.assertIn((socket.AF_INET, ('', 80)), result)
self.resolver.resolve('', 80, socket.AF_INET6, callback=self.stop)
result = self.wait()
self.assertIn((socket.AF_INET6, ('2a02:6b8:7c:40c:c51e:495f:e23a:3', 80, 0, 0)), result)
@unittest.skipIf(futures is None, "futures module not present")
class ThreadedResolverTest(AsyncTestCase, _ResolverTestMixin):
def setUp(self):
super(ThreadedResolverTest, self).setUp()
self.resolver = ThreadedResolver()
def tearDown(self):
super(ThreadedResolverTest, self).tearDown()
class ThreadedResolverErrorTest(AsyncTestCase, _ResolverErrorTestMixin):
def setUp(self):
super(ThreadedResolverErrorTest, self).setUp()
self.resolver = BlockingResolver()
self.real_getaddrinfo = socket.getaddrinfo
socket.getaddrinfo = _failing_getaddrinfo
def tearDown(self):
socket.getaddrinfo = self.real_getaddrinfo
super(ThreadedResolverErrorTest, self).tearDown()
@unittest.skipIf(futures is None, "futures module not present")
@unittest.skipIf(sys.platform == 'win32', "preexec_fn not available on win32")
class ThreadedResolverImportTest(unittest.TestCase):
def test_import(self):
# Test for a deadlock when importing a module that runs the
# ThreadedResolver at import-time. See for
# full explanation.
command = [
'import tornado.test.resolve_test_helper']
start = time.time()
popen = Popen(command, preexec_fn=lambda: signal.alarm(TIMEOUT))
while time.time() - start < TIMEOUT:
return_code = popen.poll()
if return_code is not None:
self.assertEqual(0, return_code)
return # Success.
time.sleep(0.05)"import timed out")
# We do not test errors with CaresResolver:
# Some DNS-hijacking ISPs (e.g. Time Warner) return non-empty results
# with an NXDOMAIN status code. Most resolvers treat this as an error;
# C-ares returns the results, making the "bad_host" tests unreliable.
# C-ares will try to resolve even malformed names, such as the
# name with spaces used in this test.
@unittest.skipIf(pycares is None, "pycares module not present")
class CaresResolverTest(AsyncTestCase, _ResolverTestMixin):
def setUp(self):
super(CaresResolverTest, self).setUp()
self.resolver = CaresResolver()
# TwistedResolver produces consistent errors in our test cases so we
# could test the regular and error cases in the same class. However,
# in the error cases it appears that cleanup of socket objects is
# handled asynchronously and occasionally results in "unclosed socket"
# warnings if not given time to shut down (and there is no way to
# explicitly shut it down). This makes the test flaky, so we do not
# test error cases here.
@unittest.skipIf(twisted is None, "twisted module not present")
@unittest.skipIf(getattr(twisted, '__version__', '0.0') < "12.1", "old version of twisted")
class TwistedResolverTest(AsyncTestCase, _ResolverTestMixin):
def setUp(self):
super(TwistedResolverTest, self).setUp()
self.resolver = TwistedResolver()
class IsValidIPTest(unittest.TestCase):
def test_is_valid_ip(self):
self.assertTrue(not is_valid_ip(''))
self.assertTrue(not is_valid_ip('localhost'))
self.assertTrue(not is_valid_ip('<'))
self.assertTrue(not is_valid_ip(''))
self.assertTrue(not is_valid_ip(''))
self.assertTrue(not is_valid_ip(' '))
self.assertTrue(not is_valid_ip('\n'))
self.assertTrue(not is_valid_ip('\x00'))
class TestPortAllocation(unittest.TestCase):
def test_same_port_allocation(self):
if 'TRAVIS' in os.environ:
self.skipTest("dual-stack servers often have port conflicts on travis")
sockets = bind_sockets(None, 'localhost')
port = sockets[0].getsockname()[1]
self.assertTrue(all(s.getsockname()[1] == port
for s in sockets[1:]))
for sock in sockets:
@unittest.skipIf(not hasattr(socket, "SO_REUSEPORT"), "SO_REUSEPORT is not supported")
def test_reuse_port(self):
sockets = []
socket, port = bind_unused_port(reuse_port=True)
sockets = bind_sockets(port, '', reuse_port=True)
self.assertTrue(all(s.getsockname()[1] == port for s in sockets))
for sock in sockets: