diff --git a/Lib/test/test_nntplib.py b/Lib/test/test_nntplib.py index 8c1032b986bf61f..a8787cf379cba46 100644 --- a/Lib/test/test_nntplib.py +++ b/Lib/test/test_nntplib.py @@ -3,7 +3,6 @@ import datetime import textwrap import unittest -import functools import contextlib import os.path import threading @@ -19,6 +18,7 @@ TIMEOUT = 30 +TEST_PASSWORD = 'testpw' certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem') # TODO: @@ -27,6 +27,489 @@ # - test auth and `usenetrc` +class NNTPv1Handler: + """A handler for RFC 977""" + + welcome = "200 NNTP mock server" + + def start(self, readline, push_data): + self.in_body = False + self.allow_posting = True + self._readline = readline + self._push_data = push_data + self._logged_in = False + self._user_sent = False + # Our welcome + self.handle_welcome() + + def _decode(self, data): + return str(data, "utf-8", "surrogateescape") + + def process_pending(self): + if self.in_body: + while True: + line = self._readline() + if not line: + return + self.body.append(line) + if line == b".\r\n": + break + try: + meth, tokens = self.body_callback + meth(*tokens, body=self.body) + finally: + self.body_callback = None + self.body = None + self.in_body = False + while True: + line = self._decode(self._readline()) + if not line: + return + if not line.endswith("\r\n"): + raise ValueError("line doesn't end with \\r\\n: {!r}".format(line)) + line = line[:-2] + cmd, *tokens = line.split() + meth = getattr(self, "handle_" + cmd.upper(), None) + if meth is None: + self.handle_unknown() + else: + try: + meth(*tokens) + except Exception as e: + raise ValueError("command failed: {!r}".format(line)) from e + else: + if self.in_body: + self.body_callback = meth, tokens + self.body = [] + + def expect_body(self): + """Flag that the client is expected to post a request body""" + self.in_body = True + + def push_data(self, data): + """Push some binary data""" + self._push_data(data) + + def push_lit(self, lit): + """Push a string literal""" + lit = textwrap.dedent(lit) + lit = "\r\n".join(lit.splitlines()) + "\r\n" + lit = lit.encode('utf-8') + self.push_data(lit) + + def handle_unknown(self): + self.push_lit("500 What?") + + def handle_welcome(self): + self.push_lit(self.welcome) + + def handle_QUIT(self): + self.push_lit("205 Bye!") + + def handle_DATE(self): + self.push_lit("111 20100914001155") + + def handle_GROUP(self, group): + if group in {"fr.comp.lang.python", "comp.lang.python"}: + self.push_lit(f"211 486 761 1265 {group}") + else: + self.push_lit("411 No such group {}".format(group)) + + def handle_HELP(self): + self.push_lit("""\ + 100 Legal commands + authinfo user Name|pass Password|generic + date + help + Report problems to + .""") + + def handle_STAT(self, message_spec=None): + if message_spec is None: + self.push_lit("412 No newsgroup selected") + elif message_spec == "3000234": + self.push_lit("223 3000234 <45223423@example.com>") + elif message_spec == "<45223423@example.com>": + self.push_lit("223 0 <45223423@example.com>") + else: + self.push_lit("430 No Such Article Found") + + def handle_NEXT(self): + self.push_lit("223 3000237 <668929@example.org> retrieved") + + def handle_LAST(self): + self.push_lit("223 3000234 <45223423@example.com> retrieved") + + def handle_LIST(self, action=None, param=None): + if action is None: + self.push_lit("""\ + 215 Newsgroups in form "group high low flags". + comp.lang.python 0000052340 0000002828 y + comp.lang.python.announce 0000001153 0000000993 m + free.it.comp.lang.python 0000000002 0000000002 y + fr.comp.lang.python 0000001254 0000000760 y + free.it.comp.lang.python.learner 0000000000 0000000001 y + tw.bbs.comp.lang.python 0000000304 0000000304 y + .""") + elif action == "ACTIVE": + if param == "*distutils*": + self.push_lit("""\ + 215 Newsgroups in form "group high low flags" + gmane.comp.python.distutils.devel 0000014104 0000000001 m + gmane.comp.python.distutils.cvs 0000000000 0000000001 m + .""") + else: + self.push_lit("""\ + 215 Newsgroups in form "group high low flags" + .""") + elif action == "OVERVIEW.FMT": + self.push_lit("""\ + 215 Order of fields in overview database. + Subject: + From: + Date: + Message-ID: + References: + Bytes: + Lines: + Xref:full + .""") + elif action == "NEWSGROUPS": + assert param is not None + if param == "comp.lang.python": + self.push_lit("""\ + 215 Descriptions in form "group description". + comp.lang.python\tThe Python computer language. + .""") + elif param == "comp.lang.python*": + self.push_lit("""\ + 215 Descriptions in form "group description". + comp.lang.python.announce\tAnnouncements about the Python language. (Moderated) + comp.lang.python\tThe Python computer language. + .""") + else: + self.push_lit("""\ + 215 Descriptions in form "group description". + .""") + else: + self.push_lit('501 Unknown LIST keyword') + + def handle_NEWNEWS(self, group, date_str, time_str): + # We hard code different return messages depending on passed + # argument and date syntax. + if (group == "comp.lang.python" and date_str == "20100913" + and time_str == "082004"): + # Date was passed in RFC 3977 format (NNTP "v2") + self.push_lit("""\ + 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows + + + .""") + elif (group == "comp.lang.python" and date_str == "100913" + and time_str == "082004"): + # Date was passed in RFC 977 format (NNTP "v1") + self.push_lit("""\ + 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows + + + .""") + elif (group == 'comp.lang.python' and + date_str in ('20100101', '100101') and + time_str == '090000'): + self.push_lit('too long line' * 3000 + + '\n.') + else: + self.push_lit("""\ + 230 An empty list of newsarticles follows + .""") + # (Note for experiments: many servers disable NEWNEWS. + # As of this writing, sicinfo3.epfl.ch doesn't.) + + def handle_XOVER(self, message_spec): + if message_spec == "57-59": + self.push_lit( + "224 Overview information for 57-58 follows\n" + "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout" + "\tDoug Hellmann " + "\tSat, 19 Jun 2010 18:04:08 -0400" + "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>" + "\t\t7103\t16" + "\tXref: news.gmane.org gmane.comp.python.authors:57" + "\n" + "58\tLooking for a few good bloggers" + "\tDoug Hellmann " + "\tThu, 22 Jul 2010 09:14:14 -0400" + "\t" + "\t\t6683\t16" + "\t" + "\n" + # A UTF-8 overview line from fr.comp.lang.python + "59\tRe: Message d'erreur incompréhensible (par moi)" + "\tEric Brunel " + "\tWed, 15 Sep 2010 18:09:15 +0200" + "\t" + "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27" + "\tXref: saria.nerim.net fr.comp.lang.python:1265" + "\n" + ".\n") + elif message_spec in {"1255-", "1255-1265", "1260-1265"}: + self.push_data( + b"224 Overview information follows\r\n" + b"1265\t\xD1on-UTF-8 s\xFCbject\t" + b'"Demo User" \t' + b"Sun, 19 Jun 2016 00:00:00 GMT\t" + b"<45223423@example.com>\t<45454@example.net>\t64\t3\r\n" + b".\r\n") + else: + self.push_lit("""\ + 224 No articles + .""") + + def handle_POST(self, *, body=None): + if body is None: + if self.allow_posting: + self.push_lit("340 Input article; end with .") + self.expect_body() + else: + self.push_lit("440 Posting not permitted") + else: + assert self.allow_posting + self.push_lit("240 Article received OK") + self.posted_body = body + + def handle_IHAVE(self, message_id, *, body=None): + if body is None: + if (self.allow_posting and + message_id == ""): + self.push_lit("335 Send it; end with .") + self.expect_body() + else: + self.push_lit("435 Article not wanted") + else: + assert self.allow_posting + self.push_lit("235 Article transferred OK") + self.posted_body = body + + sample_head = """\ + From: "Demo User" + Subject: I am just a test article + Content-Type: text/plain; charset=UTF-8; format=flowed + Message-ID: """ + + sample_body = """\ + This is just a test article. + ..Here is a dot-starting line. + + -- Signed by Andr\xe9.""" + + sample_article = sample_head + "\n\n" + sample_body + + nonutf8_head = b"Message-ID: <45223423@example.com>\r\n" + nonutf8_body = ( + b"Archive-Name: fr/chartes/comp.lang.python\r\n" + b"\r\n" + b"Testing n\xF6n-UTF-8\r\n" + ) + + def handle_ARTICLE(self, message_spec=None): + if message_spec is None: + self.push_lit("220 3000237 <45223423@example.com>") + elif message_spec == "<45223423@example.com>": + self.push_lit("220 0 <45223423@example.com>") + elif message_spec == "3000234": + self.push_lit("220 3000234 <45223423@example.com>") + elif message_spec == "1265": + self.push_data(b"220 1265 <45223423@example.com> article\r\n" + + self.nonutf8_head + + b"\r\n" + + self.nonutf8_body + + b".\r\n") + return + else: + self.push_lit("430 No Such Article Found") + return + self.push_lit(self.sample_article) + self.push_lit(".") + + def handle_HEAD(self, message_spec=None): + if message_spec is None: + self.push_lit("221 3000237 <45223423@example.com>") + elif message_spec == "<45223423@example.com>": + self.push_lit("221 0 <45223423@example.com>") + elif message_spec == "3000234": + self.push_lit("221 3000234 <45223423@example.com>") + elif message_spec == "1265": + self.push_data(b"221 1265 <45223423@example.com> head\r\n" + + self.nonutf8_head + + b".\r\n") + return + else: + self.push_lit("430 No Such Article Found") + return + self.push_lit(self.sample_head) + self.push_lit(".") + + def handle_BODY(self, message_spec=None): + if message_spec is None: + self.push_lit("222 3000237 <45223423@example.com>") + elif message_spec == "<45223423@example.com>": + self.push_lit("222 0 <45223423@example.com>") + elif message_spec == "3000234": + self.push_lit("222 3000234 <45223423@example.com>") + elif message_spec == "1265": + self.push_data(b"222 1265 <45223423@example.com> body\r\n" + + self.nonutf8_body + + b".\r\n") + return + else: + self.push_lit("430 No Such Article Found") + return + self.push_lit(self.sample_body) + self.push_lit(".") + + def handle_AUTHINFO(self, cred_type, data): + if self._logged_in: + self.push_lit('502 Already Logged In') + elif cred_type == 'user': + if self._user_sent: + self.push_lit('482 User Credential Already Sent') + else: + self.push_lit('381 Password Required') + self._user_sent = True + elif cred_type == 'pass': + if data != TEST_PASSWORD: + self.push_lit('481 Authentication failed') + return + self.push_lit('281 Login Successful') + self._logged_in = True + else: + raise Exception('Unknown cred type {}'.format(cred_type)) + + +class NNTPv2Handler(NNTPv1Handler): + """A handler for RFC 3977 (NNTP "v2")""" + + def handle_CAPABILITIES(self): + fmt = """\ + 101 Capability list: + VERSION 2 3 + IMPLEMENTATION INN 2.5.1{} + HDR + LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT + OVER + POST + READER + .""" + + if not self._logged_in: + self.push_lit(fmt.format('\n AUTHINFO USER')) + else: + self.push_lit(fmt.format('')) + + def handle_MODE(self, _): + raise Exception('MODE READER sent despite READER has been advertised') + + def handle_OVER(self, message_spec=None): + return self.handle_XOVER(message_spec) + + +class TcpServer(NNTPv2Handler): + def __init__(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, + socket.IPPROTO_TCP) + self.port = support.bind_port(self.sock) + self.sock.listen() + + def run_server(self): + with self.sock: + [client, _] = self.sock.accept() + with contextlib.ExitStack() as self.cleanup: + client = self.cleanup.enter_context(client) + self.run_connection(client) + + def run_connection(self, client): + self.client = client + self.reader = self.cleanup.enter_context(self.client.makefile('rb')) + self.start(self.reader.readline, self.client.sendall) + self.process_pending() + + def handle_CAPABILITIES(self): + self.push_lit( + '101 Capability list:\r\n' + 'VERSION 2\r\n' + 'AUTHINFO USER\r\n' + 'LIST ACTIVE NEWSGROUPS OVERVIEW.FMT\r\n' + 'OVER\r\n' + 'READER\r\n' + 'STARTTLS\r\n' + '.\r\n' + ) + + def handle_LIST(self, action='ACTIVE', param=None): + # Override to return non-ASCII group names + if action == 'ACTIVE': + result = { + None: 'test.ñon-äscii', + 'fr.comp.lang.*': 'fr.comp.lang.python', + 'comp.lang.*': 'comp.lang.python', + } + self.push_lit( + '215 Newsgroups as "name high low status"\r\n' + '{} 3002322 3000234 y\r\n' + '.\r\n'.format(result[param])) + elif action == 'NEWSGROUPS': + results = { + 'fr.comp.lang.python': + 'fr.comp.lang.python\t' + 'Langage de programmation Python.\r\n', + 'comp.lang.python': + 'comp.lang.python\tThe Python computer language.\r\n', + 'fr.comp.lang.*': + 'fr.comp.lang.c\t\t' + 'Langages de programmation C et assimilés.\r\n' + 'fr.comp.lang.python\t' + 'Langage de programmation Python.\r\n', + 'comp.lang.*': + 'comp.lang.c\t\tDiscussion about C.\r\n' + 'comp.lang.python\tThe Python computer language.\r\n', + } + results = results.get(param, '') + self.push_lit('215 Newsgroups as "name description"\r\n' + + results + '.\r\n') + else: + super().handle_LIST(action, param) + + def handle_NEWGROUPS(self, date, time): + self.push_lit('231 New newsgroups:\r\n' + 'test.ñon-äscii 3002322 3000234 y\r\n' + '.\r\n') + + def handle_STARTTLS(self): + self.reader.close() + self.push_lit('382 Begin TLS negotiation now\r\n') + self.client = ssl.wrap_socket(self.client, + server_side=True, certfile=certfile) + self.cleanup.enter_context(self.client) + self.reader = self.cleanup.enter_context(self.client.makefile('rb')) + self._readline = self.reader.readline + self._push_data = self.client.sendall + + def handle_XHDR(self, header, arg): + assert header == 'subject' + assert arg == '1265' + self.push_lit( + '221 Header follows\r\n' + '1265 Dummy subject\r\n' + '.\r\n' + ) + + +class SslServer(TcpServer): + def run_connection(self, client): + client = ssl.wrap_socket(client, server_side=True, certfile=certfile) + super().run_connection(client) + + class NetworkedNNTPTestsMixin: def test_welcome(self): @@ -41,15 +524,13 @@ def test_help(self): def test_list(self): resp, groups = self.server.list() - if len(groups) > 0: - self.assertEqual(GroupInfo, type(groups[0])) - self.assertEqual(str, type(groups[0].group)) + self.assertEqual(GroupInfo, type(groups[0])) + self.assertEqual(str, type(groups[0].group)) def test_list_active(self): resp, groups = self.server.list(self.GROUP_PAT) - if len(groups) > 0: - self.assertEqual(GroupInfo, type(groups[0])) - self.assertEqual(str, type(groups[0].group)) + self.assertEqual(GroupInfo, type(groups[0])) + self.assertEqual(str, type(groups[0].group)) def test_unknown_command(self): with self.assertRaises(nntplib.NNTPPermanentError) as cm: @@ -58,13 +539,10 @@ def test_unknown_command(self): self.assertTrue(resp.startswith("500 "), resp) def test_newgroups(self): - # gmane gets a constant influx of new groups. In order not to stress - # the server too much, we choose a recent date in the past. dt = datetime.date.today() - datetime.timedelta(days=7) resp, groups = self.server.newgroups(dt) - if len(groups) > 0: - self.assertIsInstance(groups[0], GroupInfo) - self.assertIsInstance(groups[0].group, str) + self.assertIsInstance(groups[0], GroupInfo) + self.assertIsInstance(groups[0].group, str) def test_description(self): def _check_desc(desc): @@ -84,9 +562,8 @@ def _check_desc(desc): def test_descriptions(self): resp, descs = self.server.descriptions(self.GROUP_PAT) - # 215 for LIST NEWSGROUPS, 282 for XGTITLE - self.assertTrue( - resp.startswith("215 ") or resp.startswith("282 "), resp) + # 215 for LIST NEWSGROUPS + self.assertTrue(resp.startswith("215 ")) self.assertIsInstance(descs, dict) desc = descs[self.GROUP_NAME] self.assertEqual(desc, self.server.description(self.GROUP_NAME)) @@ -118,642 +595,263 @@ def _check_art_dict(self, art_dict): "references", ":bytes", ":lines"} ) for v in art_dict.values(): - self.assertIsInstance(v, (str, type(None))) - - def test_xover(self): - resp, count, first, last, name = self.server.group(self.GROUP_NAME) - resp, lines = self.server.xover(last - 5, last) - if len(lines) == 0: - self.skipTest("no articles retrieved") - # The 'last' article is not necessarily part of the output (cancelled?) - art_num, art_dict = lines[0] - self.assertGreaterEqual(art_num, last - 5) - self.assertLessEqual(art_num, last) - self._check_art_dict(art_dict) - - @unittest.skipIf(True, 'temporarily skipped until a permanent solution' - ' is found for issue #28971') - def test_over(self): - resp, count, first, last, name = self.server.group(self.GROUP_NAME) - start = last - 10 - # The "start-" article range form - resp, lines = self.server.over((start, None)) - art_num, art_dict = lines[0] - self._check_art_dict(art_dict) - # The "start-end" article range form - resp, lines = self.server.over((start, last)) - art_num, art_dict = lines[-1] - # The 'last' article is not necessarily part of the output (cancelled?) - self.assertGreaterEqual(art_num, start) - self.assertLessEqual(art_num, last) - self._check_art_dict(art_dict) - # XXX The "message_id" form is unsupported by gmane - # 503 Overview by message-ID unsupported - - def test_xhdr(self): - resp, count, first, last, name = self.server.group(self.GROUP_NAME) - resp, lines = self.server.xhdr('subject', last) - for line in lines: - self.assertEqual(str, type(line[1])) - - def check_article_resp(self, resp, article, art_num=None): - self.assertIsInstance(article, nntplib.ArticleInfo) - if art_num is not None: - self.assertEqual(article.number, art_num) - for line in article.lines: - self.assertIsInstance(line, bytes) - # XXX this could exceptionally happen... - self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n")) - - @unittest.skipIf(True, "FIXME: see bpo-32128") - def test_article_head_body(self): - resp, count, first, last, name = self.server.group(self.GROUP_NAME) - # Try to find an available article - for art_num in (last, first, last - 1): - try: - resp, head = self.server.head(art_num) - except nntplib.NNTPTemporaryError as e: - if not e.response.startswith("423 "): - raise - # "423 No such article" => choose another one - continue - break - else: - self.skipTest("could not find a suitable article number") - self.assertTrue(resp.startswith("221 "), resp) - self.check_article_resp(resp, head, art_num) - resp, body = self.server.body(art_num) - self.assertTrue(resp.startswith("222 "), resp) - self.check_article_resp(resp, body, art_num) - resp, article = self.server.article(art_num) - self.assertTrue(resp.startswith("220 "), resp) - self.check_article_resp(resp, article, art_num) - # Tolerate running the tests from behind a NNTP virus checker - blacklist = lambda line: line.startswith(b'X-Antivirus') - filtered_head_lines = [line for line in head.lines - if not blacklist(line)] - filtered_lines = [line for line in article.lines - if not blacklist(line)] - self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines) - - def test_capabilities(self): - # The server under test implements NNTP version 2 and has a - # couple of well-known capabilities. Just sanity check that we - # got them. - def _check_caps(caps): - caps_list = caps['LIST'] - self.assertIsInstance(caps_list, (list, tuple)) - self.assertIn('OVERVIEW.FMT', caps_list) - self.assertGreaterEqual(self.server.nntp_version, 2) - _check_caps(self.server.getcapabilities()) - # This re-emits the command - resp, caps = self.server.capabilities() - _check_caps(caps) - - def test_zlogin(self): - # This test must be the penultimate because further commands will be - # refused. - baduser = "notarealuser" - badpw = "notarealpassword" - # Check that bogus credentials cause failure - self.assertRaises(nntplib.NNTPError, self.server.login, - user=baduser, password=badpw, usenetrc=False) - # FIXME: We should check that correct credentials succeed, but that - # would require valid details for some server somewhere to be in the - # test suite, I think. Gmane is anonymous, at least as used for the - # other tests. - - def test_zzquit(self): - # This test must be called last, hence the name - cls = type(self) - try: - self.server.quit() - finally: - cls.server = None - - @classmethod - def wrap_methods(cls): - # Wrap all methods in a transient_internet() exception catcher - # XXX put a generic version in test.support? - def wrap_meth(meth): - @functools.wraps(meth) - def wrapped(self): - with support.transient_internet(self.NNTP_HOST): - meth(self) - return wrapped - for name in dir(cls): - if not name.startswith('test_'): - continue - meth = getattr(cls, name) - if not callable(meth): - continue - # Need to use a closure so that meth remains bound to its current - # value - setattr(cls, name, wrap_meth(meth)) - - def test_with_statement(self): - def is_connected(): - if not hasattr(server, 'file'): - return False - try: - server.help() - except (OSError, EOFError): - return False - return True - - with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: - self.assertTrue(is_connected()) - self.assertTrue(server.help()) - self.assertFalse(is_connected()) - - with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: - server.quit() - self.assertFalse(is_connected()) - - -NetworkedNNTPTestsMixin.wrap_methods() - - -EOF_ERRORS = (EOFError,) -if ssl is not None: - EOF_ERRORS += (ssl.SSLEOFError,) - - -class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): - # This server supports STARTTLS (gmane doesn't) - NNTP_HOST = 'news.trigofacile.com' - GROUP_NAME = 'fr.comp.lang.python' - GROUP_PAT = 'fr.comp.lang.*' - - NNTP_CLASS = NNTP - - @classmethod - def setUpClass(cls): - support.requires("network") - with support.transient_internet(cls.NNTP_HOST): - try: - cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, - usenetrc=False) - except EOF_ERRORS: - raise unittest.SkipTest(f"{cls} got EOF error on connecting " - f"to {cls.NNTP_HOST!r}") - - @classmethod - def tearDownClass(cls): - if cls.server is not None: - cls.server.quit() - -@unittest.skipUnless(ssl, 'requires SSL support') -class NetworkedNNTP_SSLTests(NetworkedNNTPTests): - - # Technical limits for this public NNTP server (see http://www.aioe.org): - # "Only two concurrent connections per IP address are allowed and - # 400 connections per day are accepted from each IP address." - - NNTP_HOST = 'nntp.aioe.org' - GROUP_NAME = 'comp.lang.python' - GROUP_PAT = 'comp.lang.*' - - NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None) - - # Disabled as it produces too much data - test_list = None - - # Disabled as the connection will already be encrypted. - test_starttls = None - - -# -# Non-networked tests using a local server (or something mocking it). -# - -class _NNTPServerIO(io.RawIOBase): - """A raw IO object allowing NNTP commands to be received and processed - by a handler. The handler can push responses which can then be read - from the IO object.""" - - def __init__(self, handler): - io.RawIOBase.__init__(self) - # The channel from the client - self.c2s = io.BytesIO() - # The channel to the client - self.s2c = io.BytesIO() - self.handler = handler - self.handler.start(self.c2s.readline, self.push_data) - - def readable(self): - return True - - def writable(self): - return True - - def push_data(self, data): - """Push (buffer) some data to send to the client.""" - pos = self.s2c.tell() - self.s2c.seek(0, 2) - self.s2c.write(data) - self.s2c.seek(pos) - - def write(self, b): - """The client sends us some data""" - pos = self.c2s.tell() - self.c2s.write(b) - self.c2s.seek(pos) - self.handler.process_pending() - return len(b) - - def readinto(self, buf): - """The client wants to read a response""" - self.handler.process_pending() - b = self.s2c.read(len(buf)) - n = len(b) - buf[:n] = b - return n - - -def make_mock_file(handler): - sio = _NNTPServerIO(handler) - # Using BufferedRWPair instead of BufferedRandom ensures the file - # isn't seekable. - file = io.BufferedRWPair(sio, sio) - return (sio, file) - - -class MockedNNTPTestsMixin: - # Override in derived classes - handler_class = None - - def setUp(self): - super().setUp() - self.make_server() - - def tearDown(self): - super().tearDown() - del self.server - - def make_server(self, *args, **kwargs): - self.handler = self.handler_class() - self.sio, file = make_mock_file(self.handler) - self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs) - return self.server - - -class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin): - def setUp(self): - super().setUp() - self.make_server(readermode=True) - + self.assertIsInstance(v, (str, type(None))) -class NNTPv1Handler: - """A handler for RFC 977""" + def test_xover(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + resp, lines = self.server.xover(last - 5, last) + # The 'last' article is not necessarily part of the output (cancelled?) + art_num, art_dict = lines[0] + self.assertGreaterEqual(art_num, last - 5) + self.assertLessEqual(art_num, last) + self._check_art_dict(art_dict) - welcome = "200 NNTP mock server" + def test_over(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + start = last - 10 + # The "start-" article range form + resp, lines = self.server.over((start, None)) + art_num, art_dict = lines[0] + self._check_art_dict(art_dict) + # The "start-end" article range form + resp, lines = self.server.over((start, last)) + art_num, art_dict = lines[-1] + # The 'last' article is not necessarily part of the output (cancelled?) + self.assertGreaterEqual(art_num, start) + self.assertLessEqual(art_num, last) + self._check_art_dict(art_dict) + # XXX The "message_id" form is unsupported by gmane + # 503 Overview by message-ID unsupported - def start(self, readline, push_data): - self.in_body = False - self.allow_posting = True - self._readline = readline - self._push_data = push_data - self._logged_in = False - self._user_sent = False - # Our welcome - self.handle_welcome() + def test_xhdr(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + resp, lines = self.server.xhdr('subject', last) + for line in lines: + self.assertEqual(str, type(line[1])) - def _decode(self, data): - return str(data, "utf-8", "surrogateescape") + def check_article_resp(self, resp, article, art_num=None): + self.assertIsInstance(article, nntplib.ArticleInfo) + if art_num is not None: + self.assertEqual(article.number, art_num) + for line in article.lines: + self.assertIsInstance(line, bytes) + # XXX this could exceptionally happen... + self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n")) - def process_pending(self): - if self.in_body: - while True: - line = self._readline() - if not line: - return - self.body.append(line) - if line == b".\r\n": - break - try: - meth, tokens = self.body_callback - meth(*tokens, body=self.body) - finally: - self.body_callback = None - self.body = None - self.in_body = False - while True: - line = self._decode(self._readline()) - if not line: - return - if not line.endswith("\r\n"): - raise ValueError("line doesn't end with \\r\\n: {!r}".format(line)) - line = line[:-2] - cmd, *tokens = line.split() - #meth = getattr(self.handler, "handle_" + cmd.upper(), None) - meth = getattr(self, "handle_" + cmd.upper(), None) - if meth is None: - self.handle_unknown() - else: - try: - meth(*tokens) - except Exception as e: - raise ValueError("command failed: {!r}".format(line)) from e - else: - if self.in_body: - self.body_callback = meth, tokens - self.body = [] + def test_article_head_body(self): + resp, count, first, last, name = self.server.group(self.GROUP_NAME) + # Try to find an available article + art_num = last + resp, head = self.server.head(art_num) + self.assertTrue(resp.startswith("221 "), resp) + self.check_article_resp(resp, head, art_num) + resp, body = self.server.body(art_num) + self.assertTrue(resp.startswith("222 "), resp) + self.check_article_resp(resp, body, art_num) + resp, article = self.server.article(art_num) + self.assertTrue(resp.startswith("220 "), resp) + self.check_article_resp(resp, article, art_num) + # Tolerate running the tests from behind a NNTP virus checker + blacklist = lambda line: line.startswith(b'X-Antivirus') + filtered_head_lines = [line for line in head.lines + if not blacklist(line)] + filtered_lines = [line for line in article.lines + if not blacklist(line)] + self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines) - def expect_body(self): - """Flag that the client is expected to post a request body""" - self.in_body = True + def test_capabilities(self): + # The server under test implements NNTP version 2 and has a + # couple of well-known capabilities. Just sanity check that we + # got them. + def _check_caps(caps): + caps_list = caps['LIST'] + self.assertIsInstance(caps_list, (list, tuple)) + self.assertIn('OVERVIEW.FMT', caps_list) + self.assertGreaterEqual(self.server.nntp_version, 2) + _check_caps(self.server.getcapabilities()) + # This re-emits the command + resp, caps = self.server.capabilities() + _check_caps(caps) - def push_data(self, data): - """Push some binary data""" - self._push_data(data) + @unittest.skipUnless(ssl, 'requires SSL support') + def test_starttls(self): + file = self.server.file + sock = self.server.sock + self.server.starttls() + # Check that the socket and internal pseudo-file really were + # changed. + self.assertNotEqual(file, self.server.file) + self.assertNotEqual(sock, self.server.sock) + # Check that the new socket really is an SSL one + self.assertIsInstance(self.server.sock, ssl.SSLSocket) + # Check that trying starttls when it's already active fails. + self.assertRaises(ValueError, self.server.starttls) - def push_lit(self, lit): - """Push a string literal""" - lit = textwrap.dedent(lit) - lit = "\r\n".join(lit.splitlines()) + "\r\n" - lit = lit.encode('utf-8') - self.push_data(lit) + def test_zlogin(self): + # This test must be the penultimate because further commands will be + # refused. + baduser = "notarealuser" + badpw = "notarealpassword" + # Check that bogus credentials cause failure + self.assertRaises(nntplib.NNTPTemporaryError, self.server.login, + user=baduser, password=badpw, usenetrc=False) + # FIXME: We should check that correct credentials succeed. - def handle_unknown(self): - self.push_lit("500 What?") + def test_zzquit(self): + # This test must be called last, hence the name + cls = type(self) + try: + self.server.quit() + finally: + cls.server = None - def handle_welcome(self): - self.push_lit(self.welcome) + def test_with_statement(self): + def is_connected(): + if not hasattr(server, 'file'): + return False + try: + server.help() + except (OSError, EOFError): + return False + return True - def handle_QUIT(self): - self.push_lit("205 Bye!") + with support.transient_internet(self.NNTP_HOST): + with self.NNTP_CLASS(self.NNTP_HOST, + timeout=TIMEOUT, usenetrc=False) as server: + self.assertTrue(is_connected()) + self.assertTrue(server.help()) + self.assertFalse(is_connected()) - def handle_DATE(self): - self.push_lit("111 20100914001155") + with self.NNTP_CLASS(self.NNTP_HOST, + timeout=TIMEOUT, usenetrc=False) as server: + server.quit() + self.assertFalse(is_connected()) - def handle_GROUP(self, group): - if group == "fr.comp.lang.python": - self.push_lit("211 486 761 1265 fr.comp.lang.python") - else: - self.push_lit("411 No such group {}".format(group)) - def handle_HELP(self): - self.push_lit("""\ - 100 Legal commands - authinfo user Name|pass Password|generic - date - help - Report problems to - .""") +@unittest.skipUnless(threading, 'requires multithreading') +class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): + NNTP_HOST = 'news.trigofacile.com' + GROUP_NAME = 'fr.comp.lang.python' + GROUP_PAT = 'fr.comp.lang.*' - def handle_STAT(self, message_spec=None): - if message_spec is None: - self.push_lit("412 No newsgroup selected") - elif message_spec == "3000234": - self.push_lit("223 3000234 <45223423@example.com>") - elif message_spec == "<45223423@example.com>": - self.push_lit("223 0 <45223423@example.com>") - else: - self.push_lit("430 No Such Article Found") + NNTP_CLASS = NNTP + SERVER_CLASS = TcpServer - def handle_NEXT(self): - self.push_lit("223 3000237 <668929@example.org> retrieved") + @classmethod + def setUpClass(cls): + server = cls.SERVER_CLASS() + cls.background = threading.Thread(target=server.run_server) + cls.background.start() + cls.server = cls.NNTP_CLASS(support.HOST, server.port, + timeout=TIMEOUT, usenetrc=False) - def handle_LAST(self): - self.push_lit("223 3000234 <45223423@example.com> retrieved") + @classmethod + def tearDownClass(cls): + if cls.server is not None: + cls.server.quit() + cls.background.join() + # Explicitly break the reference cycle to avoid having + # dangling threads. + cls.background = None - def handle_LIST(self, action=None, param=None): - if action is None: - self.push_lit("""\ - 215 Newsgroups in form "group high low flags". - comp.lang.python 0000052340 0000002828 y - comp.lang.python.announce 0000001153 0000000993 m - free.it.comp.lang.python 0000000002 0000000002 y - fr.comp.lang.python 0000001254 0000000760 y - free.it.comp.lang.python.learner 0000000000 0000000001 y - tw.bbs.comp.lang.python 0000000304 0000000304 y - .""") - elif action == "ACTIVE": - if param == "*distutils*": - self.push_lit("""\ - 215 Newsgroups in form "group high low flags" - gmane.comp.python.distutils.devel 0000014104 0000000001 m - gmane.comp.python.distutils.cvs 0000000000 0000000001 m - .""") - else: - self.push_lit("""\ - 215 Newsgroups in form "group high low flags" - .""") - elif action == "OVERVIEW.FMT": - self.push_lit("""\ - 215 Order of fields in overview database. - Subject: - From: - Date: - Message-ID: - References: - Bytes: - Lines: - Xref:full - .""") - elif action == "NEWSGROUPS": - assert param is not None - if param == "comp.lang.python": - self.push_lit("""\ - 215 Descriptions in form "group description". - comp.lang.python\tThe Python computer language. - .""") - elif param == "comp.lang.python*": - self.push_lit("""\ - 215 Descriptions in form "group description". - comp.lang.python.announce\tAnnouncements about the Python language. (Moderated) - comp.lang.python\tThe Python computer language. - .""") - else: - self.push_lit("""\ - 215 Descriptions in form "group description". - .""") - else: - self.push_lit('501 Unknown LIST keyword') - def handle_NEWNEWS(self, group, date_str, time_str): - # We hard code different return messages depending on passed - # argument and date syntax. - if (group == "comp.lang.python" and date_str == "20100913" - and time_str == "082004"): - # Date was passed in RFC 3977 format (NNTP "v2") - self.push_lit("""\ - 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows - - - .""") - elif (group == "comp.lang.python" and date_str == "100913" - and time_str == "082004"): - # Date was passed in RFC 977 format (NNTP "v1") - self.push_lit("""\ - 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows - - - .""") - elif (group == 'comp.lang.python' and - date_str in ('20100101', '100101') and - time_str == '090000'): - self.push_lit('too long line' * 3000 + - '\n.') - else: - self.push_lit("""\ - 230 An empty list of newsarticles follows - .""") - # (Note for experiments: many servers disable NEWNEWS. - # As of this writing, sicinfo3.epfl.ch doesn't.) +@unittest.skipUnless(ssl, 'requires SSL support') +class NetworkedNNTP_SSLTests(NetworkedNNTPTests): - def handle_XOVER(self, message_spec): - if message_spec == "57-59": - self.push_lit( - "224 Overview information for 57-58 follows\n" - "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout" - "\tDoug Hellmann " - "\tSat, 19 Jun 2010 18:04:08 -0400" - "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>" - "\t\t7103\t16" - "\tXref: news.gmane.org gmane.comp.python.authors:57" - "\n" - "58\tLooking for a few good bloggers" - "\tDoug Hellmann " - "\tThu, 22 Jul 2010 09:14:14 -0400" - "\t" - "\t\t6683\t16" - "\t" - "\n" - # A UTF-8 overview line from fr.comp.lang.python - "59\tRe: Message d'erreur incompréhensible (par moi)" - "\tEric Brunel " - "\tWed, 15 Sep 2010 18:09:15 +0200" - "\t" - "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27" - "\tXref: saria.nerim.net fr.comp.lang.python:1265" - "\n" - ".\n") - else: - self.push_lit("""\ - 224 No articles - .""") + # Technical limits for this public NNTP server (see http://www.aioe.org): + # "Only two concurrent connections per IP address are allowed and + # 400 connections per day are accepted from each IP address." - def handle_POST(self, *, body=None): - if body is None: - if self.allow_posting: - self.push_lit("340 Input article; end with .") - self.expect_body() - else: - self.push_lit("440 Posting not permitted") - else: - assert self.allow_posting - self.push_lit("240 Article received OK") - self.posted_body = body + NNTP_HOST = 'nntp.aioe.org' + GROUP_NAME = 'comp.lang.python' + GROUP_PAT = 'comp.lang.*' - def handle_IHAVE(self, message_id, *, body=None): - if body is None: - if (self.allow_posting and - message_id == ""): - self.push_lit("335 Send it; end with .") - self.expect_body() - else: - self.push_lit("435 Article not wanted") - else: - assert self.allow_posting - self.push_lit("235 Article transferred OK") - self.posted_body = body + NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None) + SERVER_CLASS = SslServer - sample_head = """\ - From: "Demo User" - Subject: I am just a test article - Content-Type: text/plain; charset=UTF-8; format=flowed - Message-ID: """ + # Disabled as the connection will already be encrypted. + test_starttls = None - sample_body = """\ - This is just a test article. - ..Here is a dot-starting line. - -- Signed by Andr\xe9.""" +# +# Non-networked tests using something mocking a local server. +# - sample_article = sample_head + "\n\n" + sample_body +class _NNTPServerIO(io.RawIOBase): + """A raw IO object allowing NNTP commands to be received and processed + by a handler. The handler can push responses which can then be read + from the IO object.""" - def handle_ARTICLE(self, message_spec=None): - if message_spec is None: - self.push_lit("220 3000237 <45223423@example.com>") - elif message_spec == "<45223423@example.com>": - self.push_lit("220 0 <45223423@example.com>") - elif message_spec == "3000234": - self.push_lit("220 3000234 <45223423@example.com>") - else: - self.push_lit("430 No Such Article Found") - return - self.push_lit(self.sample_article) - self.push_lit(".") + def __init__(self, handler): + io.RawIOBase.__init__(self) + # The channel from the client + self.c2s = io.BytesIO() + # The channel to the client + self.s2c = io.BytesIO() + self.handler = handler + self.handler.start(self.c2s.readline, self.push_data) - def handle_HEAD(self, message_spec=None): - if message_spec is None: - self.push_lit("221 3000237 <45223423@example.com>") - elif message_spec == "<45223423@example.com>": - self.push_lit("221 0 <45223423@example.com>") - elif message_spec == "3000234": - self.push_lit("221 3000234 <45223423@example.com>") - else: - self.push_lit("430 No Such Article Found") - return - self.push_lit(self.sample_head) - self.push_lit(".") + def readable(self): + return True - def handle_BODY(self, message_spec=None): - if message_spec is None: - self.push_lit("222 3000237 <45223423@example.com>") - elif message_spec == "<45223423@example.com>": - self.push_lit("222 0 <45223423@example.com>") - elif message_spec == "3000234": - self.push_lit("222 3000234 <45223423@example.com>") - else: - self.push_lit("430 No Such Article Found") - return - self.push_lit(self.sample_body) - self.push_lit(".") + def writable(self): + return True - def handle_AUTHINFO(self, cred_type, data): - if self._logged_in: - self.push_lit('502 Already Logged In') - elif cred_type == 'user': - if self._user_sent: - self.push_lit('482 User Credential Already Sent') - else: - self.push_lit('381 Password Required') - self._user_sent = True - elif cred_type == 'pass': - self.push_lit('281 Login Successful') - self._logged_in = True - else: - raise Exception('Unknown cred type {}'.format(cred_type)) + def push_data(self, data): + """Push (buffer) some data to send to the client.""" + pos = self.s2c.tell() + self.s2c.seek(0, 2) + self.s2c.write(data) + self.s2c.seek(pos) + def write(self, b): + """The client sends us some data""" + pos = self.c2s.tell() + self.c2s.write(b) + self.c2s.seek(pos) + self.handler.process_pending() + return len(b) -class NNTPv2Handler(NNTPv1Handler): - """A handler for RFC 3977 (NNTP "v2")""" + def readinto(self, buf): + """The client wants to read a response""" + self.handler.process_pending() + b = self.s2c.read(len(buf)) + n = len(b) + buf[:n] = b + return n - def handle_CAPABILITIES(self): - fmt = """\ - 101 Capability list: - VERSION 2 3 - IMPLEMENTATION INN 2.5.1{} - HDR - LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT - OVER - POST - READER - .""" - if not self._logged_in: - self.push_lit(fmt.format('\n AUTHINFO USER')) - else: - self.push_lit(fmt.format('')) +def make_mock_file(handler): + sio = _NNTPServerIO(handler) + # Using BufferedRWPair instead of BufferedRandom ensures the file + # isn't seekable. + file = io.BufferedRWPair(sio, sio) + return (sio, file) - def handle_MODE(self, _): - raise Exception('MODE READER sent despite READER has been advertised') - def handle_OVER(self, message_spec=None): - return self.handle_XOVER(message_spec) +class MockedNNTPTestsMixin: + # Override in derived classes + handler_class = None + + def setUp(self): + super().setUp() + self.make_server() + + def tearDown(self): + super().tearDown() + del self.server + + def make_server(self, *args, **kwargs): + self.handler = self.handler_class() + self.sio, file = make_mock_file(self.handler) + self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs) + return self.server + + +class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin): + def setUp(self): + super().setUp() + self.make_server(readermode=True) class CapsAfterLoginNNTPv2Handler(NNTPv2Handler): @@ -1521,65 +1619,5 @@ def nntp_class(*pos, **kw): return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw) -class LocalServerTests(unittest.TestCase): - def setUp(self): - sock = socket.socket() - port = support.bind_port(sock) - sock.listen() - self.background = threading.Thread( - target=self.run_server, args=(sock,)) - self.background.start() - self.addCleanup(self.background.join) - - self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__() - self.addCleanup(self.nntp.__exit__, None, None, None) - - def run_server(self, sock): - # Could be generalized to handle more commands in separate methods - with sock: - [client, _] = sock.accept() - with contextlib.ExitStack() as cleanup: - cleanup.enter_context(client) - reader = cleanup.enter_context(client.makefile('rb')) - client.sendall(b'200 Server ready\r\n') - while True: - cmd = reader.readline() - if cmd == b'CAPABILITIES\r\n': - client.sendall( - b'101 Capability list:\r\n' - b'VERSION 2\r\n' - b'STARTTLS\r\n' - b'.\r\n' - ) - elif cmd == b'STARTTLS\r\n': - reader.close() - client.sendall(b'382 Begin TLS negotiation now\r\n') - context = ssl.SSLContext() - context.load_cert_chain(certfile) - client = context.wrap_socket( - client, server_side=True) - cleanup.enter_context(client) - reader = cleanup.enter_context(client.makefile('rb')) - elif cmd == b'QUIT\r\n': - client.sendall(b'205 Bye!\r\n') - break - else: - raise ValueError('Unexpected command {!r}'.format(cmd)) - - @unittest.skipUnless(ssl, 'requires SSL support') - def test_starttls(self): - file = self.nntp.file - sock = self.nntp.sock - self.nntp.starttls() - # Check that the socket and internal pseudo-file really were - # changed. - self.assertNotEqual(file, self.nntp.file) - self.assertNotEqual(sock, self.nntp.sock) - # Check that the new socket really is an SSL one - self.assertIsInstance(self.nntp.sock, ssl.SSLSocket) - # Check that trying starttls when it's already active fails. - self.assertRaises(ValueError, self.nntp.starttls) - - if __name__ == "__main__": unittest.main()