From 8822847ce866f6c7aedb928a8b5a979bdf08216d Mon Sep 17 00:00:00 2001 From: Stephan Richter Date: Thu, 12 Aug 2004 19:56:31 +0000 Subject: [PATCH] Backported r26559 | srichter | 2004-07-15 17:22:32 -0400 (Thu, 15 Jul 2004) | 2 lines r26560 | srichter | 2004-07-15 17:38:42 -0400 (Thu, 15 Jul 2004) | 2 lines --- adjustments.py | 65 +++++++ buffers.py | 237 +++++++++++++++++++++++ fixedstreamreceiver.py | 52 +++++ ftp/logger.py | 3 +- ftp/publisher.py | 161 ++++++++++++++++ ftp/server.py | 4 +- ftp/tests/fstests.py | 156 +++++++++++++++ ftp/tests/test_publisher.py | 8 +- http/chunking.py | 108 +++++++++++ http/commonaccesslogger.py | 4 +- http/httprequestparser.py | 204 ++++++++++++++++++++ http/httptask.py | 236 +++++++++++++++++++++++ http/tests/test_httpserver.py | 293 +++++++++++++++++++++++++++++ http/tests/test_publisherserver.py | 192 +++++++++++++++++++ linereceiver/linecommandparser.py | 70 +++++++ linereceiver/linetask.py | 69 +++++++ logger/filelogger.py | 71 +++++++ logger/m_syslog.py | 177 +++++++++++++++++ logger/pythonlogger.py | 2 +- logger/resolvinglogger.py | 52 +++++ logger/taillogger.py | 42 +++++ logger/unresolvinglogger.py | 31 +++ taskthreads.py | 122 ++++++++++++ 23 files changed, 2348 insertions(+), 11 deletions(-) create mode 100644 adjustments.py create mode 100644 buffers.py create mode 100644 fixedstreamreceiver.py create mode 100644 ftp/publisher.py create mode 100644 ftp/tests/fstests.py create mode 100644 http/chunking.py create mode 100644 http/httprequestparser.py create mode 100644 http/httptask.py create mode 100644 http/tests/test_httpserver.py create mode 100644 http/tests/test_publisherserver.py create mode 100644 linereceiver/linecommandparser.py create mode 100644 linereceiver/linetask.py create mode 100644 logger/filelogger.py create mode 100644 logger/m_syslog.py create mode 100644 logger/resolvinglogger.py create mode 100644 logger/taillogger.py create mode 100644 logger/unresolvinglogger.py create mode 100644 taskthreads.py diff --git a/adjustments.py b/adjustments.py new file mode 100644 index 0000000..c8a6944 --- /dev/null +++ b/adjustments.py @@ -0,0 +1,65 @@ +############################################################################## +# +# Copyright (c) 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Adjustments are tunable parameters. + +$Id$ +""" +from zope.server import maxsockets + + +class Adjustments(object): + """This class contains tunable communication parameters. + + You can either change default_adj to adjust parameters for + all sockets, or you can create a new instance of this class, + change its attributes, and pass it to the channel constructors. + """ + + # backlog is the argument to pass to socket.listen(). + backlog = 1024 + + # recv_bytes is the argument to pass to socket.recv(). + recv_bytes = 8192 + + # send_bytes is the number of bytes to send to socket.send(). + send_bytes = 8192 + + # copy_bytes is the number of bytes to copy from one file to another. + copy_bytes = 65536 + + # Create a tempfile if the pending output data gets larger + # than outbuf_overflow. With RAM so cheap, this probably + # ought to be set to the 16-32 MB range (circa 2001) for + # good performance with big transfers. The default is + # conservative. + outbuf_overflow = 1050000 + + # Create a tempfile if the data received gets larger + # than inbuf_overflow. + inbuf_overflow = 525000 + + # Stop accepting new connections if too many are already active. + connection_limit = maxsockets.max_select_sockets() - 3 # Safe + + # Minimum seconds between cleaning up inactive channels. + cleanup_interval = 300 + + # Maximum seconds to leave an inactive connection open. + channel_timeout = 900 + + # Boolean: turn off to not log premature client disconnects. + log_socket_errors = 1 + + +default_adj = Adjustments() diff --git a/buffers.py b/buffers.py new file mode 100644 index 0000000..27cbf5b --- /dev/null +++ b/buffers.py @@ -0,0 +1,237 @@ +############################################################################## +# +# Copyright (c) 2001-2004 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Buffers + +$Id$ +""" +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO + + +# copy_bytes controls the size of temp. strings for shuffling data around. +COPY_BYTES = 1 << 18 # 256K + +# The maximum number of bytes to buffer in a simple string. +STRBUF_LIMIT = 8192 + + +class FileBasedBuffer(object): + + remain = 0 + + def __init__(self, file, from_buffer=None): + self.file = file + if from_buffer is not None: + from_file = from_buffer.getfile() + read_pos = from_file.tell() + from_file.seek(0) + while 1: + data = from_file.read(COPY_BYTES) + if not data: + break + file.write(data) + self.remain = int(file.tell() - read_pos) + from_file.seek(read_pos) + file.seek(read_pos) + + def __len__(self): + return self.remain + + def append(self, s): + file = self.file + read_pos = file.tell() + file.seek(0, 2) + file.write(s) + file.seek(read_pos) + self.remain = self.remain + len(s) + + def get(self, bytes=-1, skip=0): + file = self.file + if not skip: + read_pos = file.tell() + if bytes < 0: + # Read all + res = file.read() + else: + res = file.read(bytes) + if skip: + self.remain -= len(res) + else: + file.seek(read_pos) + return res + + def skip(self, bytes, allow_prune=0): + if self.remain < bytes: + raise ValueError, ( + "Can't skip %d bytes in buffer of %d bytes" % + (bytes, self.remain)) + self.file.seek(bytes, 1) + self.remain = self.remain - bytes + + def newfile(self): + raise NotImplementedError() + + def prune(self): + file = self.file + if self.remain == 0: + read_pos = file.tell() + file.seek(0, 2) + sz = file.tell() + file.seek(read_pos) + if sz == 0: + # Nothing to prune. + return + nf = self.newfile() + while 1: + data = file.read(COPY_BYTES) + if not data: + break + nf.write(data) + self.file = nf + + def getfile(self): + return self.file + + + +class TempfileBasedBuffer(FileBasedBuffer): + + def __init__(self, from_buffer=None): + FileBasedBuffer.__init__(self, self.newfile(), from_buffer) + + def newfile(self): + from tempfile import TemporaryFile + return TemporaryFile('w+b') + + + +class StringIOBasedBuffer(FileBasedBuffer): + + def __init__(self, from_buffer=None): + if from_buffer is not None: + FileBasedBuffer.__init__(self, StringIO(), from_buffer) + else: + # Shortcut. :-) + self.file = StringIO() + + def newfile(self): + return StringIO() + + + +class OverflowableBuffer(object): + """ + This buffer implementation has four stages: + - No data + - String-based buffer + - StringIO-based buffer + - Temporary file storage + The first two stages are fastest for simple transfers. + """ + + overflowed = 0 + buf = None + strbuf = '' # String-based buffer. + + def __init__(self, overflow): + # overflow is the maximum to be stored in a StringIO buffer. + self.overflow = overflow + + def __len__(self): + buf = self.buf + if buf is not None: + return len(buf) + else: + return len(self.strbuf) + + def _create_buffer(self): + # print 'creating buffer' + strbuf = self.strbuf + if len(strbuf) >= self.overflow: + self._set_large_buffer() + else: + self._set_small_buffer() + buf = self.buf + if strbuf: + buf.append(self.strbuf) + self.strbuf = '' + return buf + + def _set_small_buffer(self): + self.buf = StringIOBasedBuffer(self.buf) + self.overflowed = 0 + + def _set_large_buffer(self): + self.buf = TempfileBasedBuffer(self.buf) + self.overflowed = 1 + + def append(self, s): + buf = self.buf + if buf is None: + strbuf = self.strbuf + if len(strbuf) + len(s) < STRBUF_LIMIT: + self.strbuf = strbuf + s + return + buf = self._create_buffer() + buf.append(s) + sz = len(buf) + if not self.overflowed: + if sz >= self.overflow: + self._set_large_buffer() + + def get(self, bytes=-1, skip=0): + buf = self.buf + if buf is None: + strbuf = self.strbuf + if not skip: + return strbuf + buf = self._create_buffer() + return buf.get(bytes, skip) + + def skip(self, bytes, allow_prune=0): + buf = self.buf + if buf is None: + strbuf = self.strbuf + if allow_prune and bytes == len(strbuf): + # We could slice instead of converting to + # a buffer, but that would eat up memory in + # large transfers. + self.strbuf = '' + return + buf = self._create_buffer() + buf.skip(bytes, allow_prune) + + def prune(self): + """ + A potentially expensive operation that removes all data + already retrieved from the buffer. + """ + buf = self.buf + if buf is None: + self.strbuf = '' + return + buf.prune() + if self.overflowed: + sz = len(buf) + if sz < self.overflow: + # Revert to a faster buffer. + self._set_small_buffer() + + def getfile(self): + buf = self.buf + if buf is None: + buf = self._create_buffer() + return buf.getfile() diff --git a/fixedstreamreceiver.py b/fixedstreamreceiver.py new file mode 100644 index 0000000..8d14f4e --- /dev/null +++ b/fixedstreamreceiver.py @@ -0,0 +1,52 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Fixed Stream Receiver + +$Id$ +""" + +from zope.server.interfaces import IStreamConsumer +from zope.interface import implements + + +class FixedStreamReceiver(object): + + implements(IStreamConsumer) + + # See IStreamConsumer + completed = 0 + + def __init__(self, cl, buf): + self.remain = cl + self.buf = buf + + def received(self, data): + 'See IStreamConsumer' + rm = self.remain + if rm < 1: + self.completed = 1 # Avoid any chance of spinning + return 0 + datalen = len(data) + if rm <= datalen: + self.buf.append(data[:rm]) + self.remain = 0 + self.completed = 1 + return rm + else: + self.buf.append(data) + self.remain -= datalen + return datalen + + def getfile(self): + return self.buf.getfile() diff --git a/ftp/logger.py b/ftp/logger.py index 95a2ea2..548b62c 100644 --- a/ftp/logger.py +++ b/ftp/logger.py @@ -11,11 +11,10 @@ # FOR A PARTICULAR PURPOSE. # ############################################################################## -""" +"""Common FTP Activity Logger $Id$ """ - import time from zope.server.http.commonaccesslogger import CommonAccessLogger diff --git a/ftp/publisher.py b/ftp/publisher.py new file mode 100644 index 0000000..b4b39c5 --- /dev/null +++ b/ftp/publisher.py @@ -0,0 +1,161 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Zope Publisher-based FTP Server + +This FTP server uses the Zope 3 Publisher to execute commands. + +$Id$ +""" +import posixpath + +from cStringIO import StringIO + +from zope.server.interfaces.ftp import IFileSystem +from zope.server.interfaces.ftp import IFileSystemAccess + +from zope.server.ftp.server import FTPServer +from zope.publisher.publish import publish + +from zope.interface import implements + +class PublisherFileSystem(object): + """Generic Publisher FileSystem implementation.""" + + implements(IFileSystem) + + def __init__ (self, credentials, request_factory): + self.credentials = credentials + self.request_factory = request_factory + + def type(self, path): + if path == '/': + return 'd' + + return self._execute(path, 'type') + + def names(self, path, filter=None): + return self._execute(path, 'names', split=False, filter=filter) + + def ls(self, path, filter=None): + return self._execute(path, 'ls', split=False, filter=filter) + + def readfile(self, path, outstream, start=0, end=None): + return self._execute(path, 'readfile', + outstream=outstream, start=start, end=end) + + def lsinfo(self, path): + return self._execute(path, 'lsinfo') + + def mtime(self, path): + return self._execute(path, 'mtime') + + def size(self, path): + return self._execute(path, 'size') + + def mkdir(self, path): + return self._execute(path, 'mkdir') + + def remove(self, path): + return self._execute(path, 'remove') + + def rmdir(self, path): + return self._execute(path, 'rmdir') + + def rename(self, old, new): + 'See IWriteFileSystem' + old = self._translate(old) + new = self._translate(new) + path0, old = posixpath.split(old) + path1, new = posixpath.split(new) + assert path0 == path1 + return self._execute(path0, 'rename', split=False, old=old, new=new) + + def writefile(self, path, instream, start=None, end=None, append=False): + 'See IWriteFileSystem' + return self._execute( + path, 'writefile', + instream=instream, start=start, end=end, append=append) + + def writable(self, path): + 'See IWriteFileSystem' + return self._execute(path, 'writable') + + def _execute(self, path, command, split=True, **kw): + env = {} + env.update(kw) + env['command'] = command + + path = self._translate(path) + + if split: + env['path'], env['name'] = posixpath.split(path) + else: + env['path'] = path + + env['credentials'] = self.credentials + # NoOutput avoids creating a black hole. + request = self.request_factory(StringIO(''), NoOutput(), env) + + # Note that publish() calls close() on request, which deletes the + # response from the request, so that we need to keep track of it. + response = request.response + publish(request) + return response.getResult() + + def _translate (self, path): + # Normalize + path = posixpath.normpath(path) + if path.startswith('..'): + # Someone is trying to get lower than the permitted root. + # We just ignore it. + path = '/' + return path + + +class NoOutput(object): + """An output stream lookalike that warns you if you try to + dump anything into it.""" + + def write(self, data): + raise RuntimeError, "Not a writable stream" + + def flush(self): + pass + + close = flush + + +class PublisherFTPServer(FTPServer): + """Generic FTP Server""" + + def __init__(self, request_factory, name, ip, port, *args, **kw): + fs_access = PublisherFileSystemAccess(request_factory) + super(PublisherFTPServer, self).__init__(ip, port, fs_access, + *args, **kw) + +class PublisherFileSystemAccess(object): + + implements(IFileSystemAccess) + + def __init__(self, request_factory): + self.request_factory = request_factory + + def authenticate(self, credentials): + # We can't actually do any authentication initially, as the + # user may not be defined at the root. + pass + + def open(self, credentials): + return PublisherFileSystem(credentials, self.request_factory) + diff --git a/ftp/server.py b/ftp/server.py index c2ce912..6beda5d 100644 --- a/ftp/server.py +++ b/ftp/server.py @@ -769,7 +769,7 @@ def close(self, *reply_args): -class FinishedRecvTask: +class FinishedRecvTask(object): implements(ITask) @@ -867,7 +867,7 @@ def close(self, *reply_args): ChannelBaseClass.close(self) -class ApplicationXmitStream: +class ApplicationXmitStream(object): """Provide stream output, remapping close() to close_when_done(). """ diff --git a/ftp/tests/fstests.py b/ftp/tests/fstests.py new file mode 100644 index 0000000..fdeefa1 --- /dev/null +++ b/ftp/tests/fstests.py @@ -0,0 +1,156 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Abstract file-system tests + +$Id$ +""" +from StringIO import StringIO +from zope.interface.verify import verifyObject +from zope.server.interfaces.ftp import IFileSystem + +class FileSystemTests(object): + """Tests of a readable filesystem + """ + + filesystem = None + dir_name = '/dir' + file_name = '/dir/file.txt' + unwritable_filename = '/dir/protected.txt' + dir_contents = ['file.txt', 'protected.txt'] + file_contents = 'Lengthen your stride' + + def test_type(self): + self.assertEqual(self.filesystem.type(self.dir_name), 'd') + self.assertEqual(self.filesystem.type(self.file_name), 'f') + + + def test_names(self): + lst = self.filesystem.names(self.dir_name) + lst.sort() + self.assertEqual(lst, self.dir_contents) + + def test_readfile(self): + s = StringIO() + self.filesystem.readfile(self.file_name, s) + self.assertEqual(s.getvalue(), self.file_contents) + + + def testReadPartOfFile(self): + s = StringIO() + self.filesystem.readfile(self.file_name, s, 2) + self.assertEqual(s.getvalue(), self.file_contents[2:]) + + + def testReadPartOfFile2(self): + s = StringIO() + self.filesystem.readfile(self.file_name, s, 1, 5) + self.assertEqual(s.getvalue(), self.file_contents[1:5]) + + def test_IFileSystemInterface(self): + verifyObject(IFileSystem, self.filesystem) + + def testRemove(self): + self.filesystem.remove(self.file_name) + self.failIf(self.filesystem.type(self.file_name)) + + + def testMkdir(self): + path = self.dir_name + '/x' + self.filesystem.mkdir(path) + self.assertEqual(self.filesystem.type(path), 'd') + + def testRmdir(self): + self.filesystem.remove(self.file_name) + self.filesystem.rmdir(self.dir_name) + self.failIf(self.filesystem.type(self.dir_name)) + + + def testRename(self): + self.filesystem.rename(self.file_name, self.file_name + '.bak') + self.assertEqual(self.filesystem.type(self.file_name), None) + self.assertEqual(self.filesystem.type(self.file_name + '.bak'), 'f') + + + def testWriteFile(self): + s = StringIO() + self.filesystem.readfile(self.file_name, s) + self.assertEqual(s.getvalue(), self.file_contents) + + data = 'Always ' + self.file_contents + s = StringIO(data) + self.filesystem.writefile(self.file_name, s) + + s = StringIO() + self.filesystem.readfile(self.file_name, s) + self.assertEqual(s.getvalue(), data) + + + def testAppendToFile(self): + data = ' again' + s = StringIO(data) + self.filesystem.writefile(self.file_name, s, append=True) + + s = StringIO() + self.filesystem.readfile(self.file_name, s) + self.assertEqual(s.getvalue(), self.file_contents + data) + + def testWritePartOfFile(self): + data = '123' + s = StringIO(data) + self.filesystem.writefile(self.file_name, s, 3, 6) + + expect = self.file_contents[:3] + data + self.file_contents[6:] + + s = StringIO() + self.filesystem.readfile(self.file_name, s) + self.assertEqual(s.getvalue(), expect) + + def testWritePartOfFile_and_truncate(self): + data = '123' + s = StringIO(data) + self.filesystem.writefile(self.file_name, s, 3) + + expect = self.file_contents[:3] + data + + s = StringIO() + self.filesystem.readfile(self.file_name, s) + self.assertEqual(s.getvalue(), expect) + + def testWriteBeyondEndOfFile(self): + partlen = len(self.file_contents) - 6 + data = 'daylight savings' + s = StringIO(data) + self.filesystem.writefile(self.file_name, s, partlen) + + expect = self.file_contents[:partlen] + data + + s = StringIO() + self.filesystem.readfile(self.file_name, s) + self.assertEqual(s.getvalue(), expect) + + + def testWriteNewFile(self): + s = StringIO(self.file_contents) + self.filesystem.writefile(self.file_name + '.new', s) + + s = StringIO() + self.filesystem.readfile(self.file_name, s) + self.assertEqual(s.getvalue(), self.file_contents) + + + def test_writable(self): + self.failIf(self.filesystem.writable(self.dir_name)) + self.failIf(self.filesystem.writable(self.unwritable_filename)) + self.failUnless(self.filesystem.writable(self.file_name)) + self.failUnless(self.filesystem.writable(self.file_name+'1')) diff --git a/ftp/tests/test_publisher.py b/ftp/tests/test_publisher.py index 73ed028..a3393c4 100644 --- a/ftp/tests/test_publisher.py +++ b/ftp/tests/test_publisher.py @@ -28,7 +28,7 @@ def rename(self, path, old, new): return demofs.DemoFileSystem.rename( self, "%s/%s" % (path, old), "%s/%s" % (path, new)) -class Publication: +class Publication(object): def __init__(self, root): self.root = root @@ -58,7 +58,7 @@ def handleException(self, object, request, info, retry_allowed=True): request.response._exc = info[:2] -class Request: +class Request(object): def __init__(self, input, output, env): self.env = env @@ -76,7 +76,7 @@ def traverse(self, root): def close(self): pass -class Response: +class Response(object): _exc = _body = None @@ -91,7 +91,7 @@ def getResult(self): raise self._exc[0], self._exc[1] return self._body -class RequestFactory: +class RequestFactory(object): def __init__(self, root): self.pub = Publication(root) diff --git a/http/chunking.py b/http/chunking.py new file mode 100644 index 0000000..f1fb383 --- /dev/null +++ b/http/chunking.py @@ -0,0 +1,108 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Data Chunk Receiver + +$Id$ +""" + +from zope.server.utilities import find_double_newline +from zope.server.interfaces import IStreamConsumer +from zope.interface import implements + + +class ChunkedReceiver(object): + + implements(IStreamConsumer) + + chunk_remainder = 0 + control_line = '' + all_chunks_received = 0 + trailer = '' + completed = 0 + + # max_control_line = 1024 + # max_trailer = 65536 + + + def __init__(self, buf): + self.buf = buf + + def received(self, s): + # Returns the number of bytes consumed. + if self.completed: + return 0 + orig_size = len(s) + while s: + rm = self.chunk_remainder + if rm > 0: + # Receive the remainder of a chunk. + to_write = s[:rm] + self.buf.append(to_write) + written = len(to_write) + s = s[written:] + self.chunk_remainder -= written + elif not self.all_chunks_received: + # Receive a control line. + s = self.control_line + s + pos = s.find('\n') + if pos < 0: + # Control line not finished. + self.control_line = s + s = '' + else: + # Control line finished. + line = s[:pos] + s = s[pos + 1:] + self.control_line = '' + line = line.strip() + if line: + # Begin a new chunk. + semi = line.find(';') + if semi >= 0: + # discard extension info. + line = line[:semi] + sz = int(line.strip(), 16) # hexadecimal + if sz > 0: + # Start a new chunk. + self.chunk_remainder = sz + else: + # Finished chunks. + self.all_chunks_received = 1 + # else expect a control line. + else: + # Receive the trailer. + trailer = self.trailer + s + if trailer.startswith('\r\n'): + # No trailer. + self.completed = 1 + return orig_size - (len(trailer) - 2) + elif trailer.startswith('\n'): + # No trailer. + self.completed = 1 + return orig_size - (len(trailer) - 1) + pos = find_double_newline(trailer) + if pos < 0: + # Trailer not finished. + self.trailer = trailer + s = '' + else: + # Finished the trailer. + self.completed = 1 + self.trailer = trailer[:pos] + return orig_size - (len(trailer) - pos) + return orig_size + + + def getfile(self): + return self.buf.getfile() diff --git a/http/commonaccesslogger.py b/http/commonaccesslogger.py index 45befec..cd202a8 100644 --- a/http/commonaccesslogger.py +++ b/http/commonaccesslogger.py @@ -11,7 +11,7 @@ # FOR A PARTICULAR PURPOSE. # ############################################################################## -""" +"""Common Access Logger $Id$ """ @@ -23,7 +23,7 @@ from zope.server.logger.resolvinglogger import ResolvingLogger from zope.server.logger.unresolvinglogger import UnresolvingLogger -class CommonAccessLogger: +class CommonAccessLogger(object): """Outputs accesses in common HTTP log format. """ diff --git a/http/httprequestparser.py b/http/httprequestparser.py new file mode 100644 index 0000000..3331c20 --- /dev/null +++ b/http/httprequestparser.py @@ -0,0 +1,204 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""HTTP Request Parser + +This server uses asyncore to accept connections and do initial +processing but threads to do work. + +$Id$ +""" +import re +from urllib import unquote + +from zope.server.fixedstreamreceiver import FixedStreamReceiver +from zope.server.buffers import OverflowableBuffer +from zope.server.utilities import find_double_newline +from zope.server.interfaces import IStreamConsumer +from zope.interface import implements + +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO + + +class HTTPRequestParser(object): + """A structure that collects the HTTP request. + + Once the stream is completed, the instance is passed to + a server task constructor. + """ + + implements(IStreamConsumer) + + completed = 0 # Set once request is completed. + empty = 0 # Set if no request was made. + header_plus = '' + chunked = 0 + content_length = 0 + body_rcv = None + # Other attributes: first_line, header, headers, command, uri, version, + # path, query, fragment + + # headers is a mapping containing keys translated to uppercase + # with dashes turned into underscores. + + def __init__(self, adj): + """ + adj is an Adjustments object. + """ + self.headers = {} + self.adj = adj + + def received(self, data): + """ + Receives the HTTP stream for one request. + Returns the number of bytes consumed. + Sets the completed flag once both the header and the + body have been received. + """ + if self.completed: + return 0 # Can't consume any more. + datalen = len(data) + br = self.body_rcv + if br is None: + # In header. + s = self.header_plus + data + index = find_double_newline(s) + if index >= 0: + # Header finished. + header_plus = s[:index] + consumed = len(data) - (len(s) - index) + self.in_header = 0 + # Remove preceeding blank lines. + header_plus = header_plus.lstrip() + if not header_plus: + self.empty = 1 + self.completed = 1 + else: + self.parse_header(header_plus) + if self.body_rcv is None: + self.completed = 1 + return consumed + else: + # Header not finished yet. + self.header_plus = s + return datalen + else: + # In body. + consumed = br.received(data) + if br.completed: + self.completed = 1 + return consumed + + + def parse_header(self, header_plus): + """ + Parses the header_plus block of text (the headers plus the + first line of the request). + """ + index = header_plus.find('\n') + if index >= 0: + first_line = header_plus[:index].rstrip() + header = header_plus[index + 1:] + else: + first_line = header_plus.rstrip() + header = '' + self.first_line = first_line + self.header = header + + lines = self.get_header_lines() + headers = self.headers + for line in lines: + index = line.find(':') + if index > 0: + key = line[:index] + value = line[index + 1:].strip() + key1 = key.upper().replace('-', '_') + headers[key1] = value + # else there's garbage in the headers? + + command, uri, version = self.crack_first_line() + self.command = str(command) + self.uri = str(uri) + self.version = version + self.split_uri() + + if version == '1.1': + te = headers.get('TRANSFER_ENCODING', '') + if te == 'chunked': + from zope.server.http.chunking import ChunkedReceiver + self.chunked = 1 + buf = OverflowableBuffer(self.adj.inbuf_overflow) + self.body_rcv = ChunkedReceiver(buf) + if not self.chunked: + cl = int(headers.get('CONTENT_LENGTH', 0)) + self.content_length = cl + if cl > 0: + buf = OverflowableBuffer(self.adj.inbuf_overflow) + self.body_rcv = FixedStreamReceiver(cl, buf) + + + def get_header_lines(self): + """ + Splits the header into lines, putting multi-line headers together. + """ + r = [] + lines = self.header.split('\n') + for line in lines: + if line and line[0] in ' \t': + r[-1] = r[-1] + line[1:] + else: + r.append(line) + return r + + first_line_re = re.compile ( + '([^ ]+) (?:[^ :?#]+://[^ ?#/]*)?([^ ]+)(( HTTP/([0-9.]+))$|$)') + + def crack_first_line(self): + r = self.first_line + m = self.first_line_re.match (r) + if m is not None and m.end() == len(r): + if m.group(3): + version = m.group(5) + else: + version = None + return m.group(1).upper(), m.group(2), version + else: + return None, None, None + + path_regex = re.compile ( + # path query fragment + r'([^?#]*)(\?[^#]*)?(#.*)?' + ) + + def split_uri(self): + m = self.path_regex.match (self.uri) + if m.end() != len(self.uri): + raise ValueError, "Broken URI" + else: + path, query, self.fragment = m.groups() + if path and '%' in path: + path = unquote(path) + self.path = path + if query: + query = query[1:] + self.query = query + + def getBodyStream(self): + body_rcv = self.body_rcv + if body_rcv is not None: + return body_rcv.getfile() + else: + return StringIO('') diff --git a/http/httptask.py b/http/httptask.py new file mode 100644 index 0000000..c29b6c5 --- /dev/null +++ b/http/httptask.py @@ -0,0 +1,236 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""HTTP Task + +An HTTP task that can execute an HTTP request with the help of the channel and +the server it belongs to. + +$Id$ +""" +import socket +import time + +from zope.server.http.http_date import build_http_date + +from zope.server.interfaces import IHeaderOutput +from zope.server.interfaces import ITask + +from zope.interface import implements + +rename_headers = { + 'CONTENT_LENGTH' : 'CONTENT_LENGTH', + 'CONTENT_TYPE' : 'CONTENT_TYPE', + 'CONNECTION' : 'CONNECTION_TYPE', + } + +class HTTPTask(object): + """An HTTP task accepts a request and writes to a channel. + + Subclass this and override the execute() method. + """ + + implements(ITask, IHeaderOutput) #, IOutputStream + + instream = None + close_on_finish = 1 + status = '200' + reason = 'Ok' + wrote_header = 0 + accumulated_headers = None + bytes_written = 0 + auth_user_name = '' + cgi_env = None + + def __init__(self, channel, request_data): + self.channel = channel + self.request_data = request_data + self.response_headers = { + 'Server': channel.server.SERVER_IDENT, + } + version = request_data.version + if version not in ('1.0', '1.1'): + # fall back to a version we support. + version = '1.0' + self.version = version + + def service(self): + """See zope.server.interfaces.ITask""" + try: + try: + self.start() + self.channel.server.executeRequest(self) + self.finish() + except socket.error: + self.close_on_finish = 1 + if self.channel.adj.log_socket_errors: + raise + finally: + self.channel.end_task(self.close_on_finish) + + def cancel(self): + """See zope.server.interfaces.ITask""" + self.channel.close_when_done() + + def defer(self): + """See zope.server.interfaces.ITask""" + pass + + def setResponseStatus(self, status, reason): + """See zope.server.interfaces.IHeaderOutput""" + self.status = status + self.reason = reason + + def setResponseHeaders(self, mapping): + """See zope.server.interfaces.IHeaderOutput""" + self.response_headers.update(mapping) + + def appendResponseHeaders(self, lst): + """See zope.server.interfaces.IHeaderOutput""" + accum = self.accumulated_headers + if accum is None: + self.accumulated_headers = accum = [] + accum.extend(lst) + + def wroteResponseHeader(self): + """See zope.server.interfaces.IHeaderOutput""" + return self.wrote_header + + def setAuthUserName(self, name): + """See zope.server.interfaces.IHeaderOutput""" + self.auth_user_name = name + + def prepareResponseHeaders(self): + version = self.version + # Figure out whether the connection should be closed. + connection = self.request_data.headers.get('CONNECTION', '').lower() + close_it = 0 + response_headers = self.response_headers + + if version == '1.0': + if connection == 'keep-alive': + if not ('Content-Length' in response_headers): + close_it = 1 + else: + response_headers['Connection'] = 'Keep-Alive' + else: + close_it = 1 + elif version == '1.1': + if connection == 'close': + close_it = 1 + elif 'Transfer-Encoding' in response_headers: + if not response_headers['Transfer-Encoding'] == 'chunked': + close_it = 1 + elif self.status == '304': + # Replying with headers only. + pass + elif not ('Content-Length' in response_headers): + close_it = 1 + else: + # Close if unrecognized HTTP version. + close_it = 1 + + self.close_on_finish = close_it + if close_it: + self.response_headers['Connection'] = 'close' + + def buildResponseHeader(self): + self.prepareResponseHeaders() + first_line = 'HTTP/%s %s %s' % (self.version, self.status, self.reason) + lines = [first_line] + ['%s: %s' % hv + for hv in self.response_headers.items()] + accum = self.accumulated_headers + if accum is not None: + lines.extend(accum) + res = '%s\r\n\r\n' % '\r\n'.join(lines) + return res + + def getCGIEnvironment(self): + """Returns a CGI-like environment.""" + env = self.cgi_env + if env is not None: + # Return the cached copy. + return env + + request_data = self.request_data + path = request_data.path + channel = self.channel + server = channel.server + + while path and path.startswith('/'): + path = path[1:] + + env = {} + env['REQUEST_METHOD'] = request_data.command.upper() + env['SERVER_PORT'] = str(server.port) + env['SERVER_NAME'] = server.server_name + env['SERVER_SOFTWARE'] = server.SERVER_IDENT + env['SERVER_PROTOCOL'] = "HTTP/%s" % self.version + env['CHANNEL_CREATION_TIME'] = channel.creation_time + env['SCRIPT_NAME']='' + env['PATH_INFO']='/' + path + query = request_data.query + if query: + env['QUERY_STRING'] = query + env['GATEWAY_INTERFACE'] = 'CGI/1.1' + addr = channel.addr[0] + env['REMOTE_ADDR'] = addr + + # If the server has a resolver, try to get the + # remote host from the resolver's cache. + resolver = getattr(server, 'resolver', None) + if resolver is not None: + dns_cache = resolver.cache + if addr in dns_cache: + remote_host = dns_cache[addr][2] + if remote_host is not None: + env['REMOTE_HOST'] = remote_host + + env_has = env.has_key + + for key, value in request_data.headers.items(): + value = value.strip() + mykey = rename_headers.get(key, None) + if mykey is None: + mykey = 'HTTP_%s' % key + if not env_has(mykey): + env[mykey] = value + + self.cgi_env = env + return env + + def start(self): + now = time.time() + self.start_time = now + self.response_headers['Date'] = build_http_date (now) + + def finish(self): + if not self.wrote_header: + self.write('') + hit_log = self.channel.server.hit_log + if hit_log is not None: + hit_log.log(self) + + def write(self, data): + channel = self.channel + if not self.wrote_header: + rh = self.buildResponseHeader() + channel.write(rh) + self.bytes_written += len(rh) + self.wrote_header = 1 + if data: + channel.write(data) + self.bytes_written += len(data) + + def flush(self): + self.channel.flush() diff --git a/http/tests/test_httpserver.py b/http/tests/test_httpserver.py new file mode 100644 index 0000000..16a533e --- /dev/null +++ b/http/tests/test_httpserver.py @@ -0,0 +1,293 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +""" + +$Id$ +""" + +import unittest +from asyncore import socket_map, poll +import socket + +from threading import Thread +from zope.server.taskthreads import ThreadedTaskDispatcher +from zope.server.http.httpserver import HTTPServer +from zope.server.adjustments import Adjustments +from zope.server.interfaces import ITask +from zope.server.tests.asyncerror import AsyncoreErrorHook +from zope.interface import implements + +from httplib import HTTPConnection +from httplib import HTTPResponse as ClientHTTPResponse + +from time import sleep, time + +td = ThreadedTaskDispatcher() + +LOCALHOST = '127.0.0.1' +SERVER_PORT = 0 # Set these port numbers to 0 to auto-bind, or +CONNECT_TO_PORT = 0 # use specific numbers to inspect using TCPWatch. + + +my_adj = Adjustments() +# Reduce overflows to make testing easier. +my_adj.outbuf_overflow = 10000 +my_adj.inbuf_overflow = 10000 + + +class EchoHTTPServer(HTTPServer): + + def executeRequest(self, task): + headers = task.request_data.headers + if 'CONTENT_LENGTH' in headers: + cl = headers['CONTENT_LENGTH'] + task.response_headers['Content-Length'] = cl + instream = task.request_data.getBodyStream() + while 1: + data = instream.read(8192) + if not data: + break + task.write(data) + + +class SleepingTask(object): + + implements(ITask) + + def service(self): + sleep(0.2) + + def cancel(self): + pass + + def defer(self): + pass + + +class Tests(unittest.TestCase, AsyncoreErrorHook): + + def setUp(self): + td.setThreadCount(4) + self.orig_map_size = len(socket_map) + self.hook_asyncore_error() + self.server = EchoHTTPServer(LOCALHOST, SERVER_PORT, + task_dispatcher=td, adj=my_adj) + if CONNECT_TO_PORT == 0: + self.port = self.server.socket.getsockname()[1] + else: + self.port = CONNECT_TO_PORT + self.run_loop = 1 + self.counter = 0 + self.thread = Thread(target=self.loop) + self.thread.start() + sleep(0.1) # Give the thread some time to start. + + def tearDown(self): + self.run_loop = 0 + self.thread.join() + td.shutdown() + self.server.close() + # Make sure all sockets get closed by asyncore normally. + timeout = time() + 5 + while 1: + if len(socket_map) == self.orig_map_size: + # Clean! + break + if time() >= timeout: + self.fail('Leaked a socket: %s' % `socket_map`) + poll(0.1) + self.unhook_asyncore_error() + + def loop(self): + while self.run_loop: + self.counter = self.counter + 1 + #print 'loop', self.counter + poll(0.1) + + def testEchoResponse(self, h=None, add_headers=None, body=''): + if h is None: + h = HTTPConnection(LOCALHOST, self.port) + headers = {} + if add_headers: + headers.update(add_headers) + headers["Accept"] = "text/plain" + if body: + headers["Content-Length"] = str(int(len(body))) + h.request("GET", "/", body, headers) + response = h.getresponse() + self.failUnlessEqual(int(response.status), 200) + length = int(response.getheader('Content-Length', '0')) + response_body = response.read() + self.failUnlessEqual(length, len(response_body)) + self.failUnlessEqual(response_body, body) + + def testMultipleRequestsWithoutBody(self): + # Tests the use of multiple requests in a single connection. + h = HTTPConnection(LOCALHOST, self.port) + for n in range(3): + self.testEchoResponse(h) + self.testEchoResponse(h, {'Connection': 'close'}) + + def testMultipleRequestsWithBody(self): + # Tests the use of multiple requests in a single connection. + h = HTTPConnection(LOCALHOST, self.port) + for n in range(3): + self.testEchoResponse(h, body='Hello, world!') + self.testEchoResponse(h, {'Connection': 'close'}) + + def testPipelining(self): + # Tests the use of several requests issued at once. + s = ("GET / HTTP/1.0\r\n" + "Connection: %s\r\n" + "Content-Length: %d\r\n" + "\r\n" + "%s") + to_send = '' + count = 25 + for n in range(count): + body = "Response #%d\r\n" % (n + 1) + if n + 1 < count: + conn = 'keep-alive' + else: + conn = 'close' + to_send += s % (conn, len(body), body) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((LOCALHOST, self.port)) + sock.send(to_send) + for n in range(count): + expect_body = "Response #%d\r\n" % (n + 1) + response = ClientHTTPResponse(sock) + response.begin() + self.failUnlessEqual(int(response.status), 200) + length = int(response.getheader('Content-Length', '0')) + response_body = response.read(length) + self.failUnlessEqual(length, len(response_body)) + self.failUnlessEqual(response_body, expect_body) + + def testWithoutCRLF(self): + # Tests the use of just newlines rather than CR/LFs. + data = "Echo\nthis\r\nplease" + s = ("GET / HTTP/1.0\n" + "Connection: close\n" + "Content-Length: %d\n" + "\n" + "%s") % (len(data), data) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((LOCALHOST, self.port)) + sock.send(s) + response = ClientHTTPResponse(sock) + response.begin() + self.failUnlessEqual(int(response.status), 200) + length = int(response.getheader('Content-Length', '0')) + response_body = response.read(length) + self.failUnlessEqual(length, len(data)) + self.failUnlessEqual(response_body, data) + + def testLargeBody(self): + # Tests the use of multiple requests in a single connection. + h = HTTPConnection(LOCALHOST, self.port) + s = 'This string has 32 characters.\r\n' * 32 # 1024 characters. + self.testEchoResponse(h, body=(s * 1024)) # 1 MB + self.testEchoResponse(h, {'Connection': 'close'}, + body=(s * 100)) # 100 KB + + def testManyClients(self): + import sys + + # Set the number of connections to make. A previous comment said + # Linux kernel (2.4.8) doesn't like > 128. + # The test used to use 50. Win98SE can't handle that many, dying + # with + # File "C:\PYTHON23\Lib\httplib.py", line 548, in connect + # raise socket.error, msg + # error: (10055, 'No buffer space available') + nconn = 50 + if sys.platform == 'win32': + platform = sys.getwindowsversion()[3] + if platform < 2: + # 0 is Win32s on Windows 3.1 + # 1 is 95/98/ME + # 2 is NT/2000/XP + + # Pre-NT. 20 should work. The exact number you can get away + # with depends on what you're running at the same time (e.g., + # browsers and AIM and email delivery consume sockets too). + nconn = 20 + + conns = [] + for n in range(nconn): + #print 'open', n, clock() + h = HTTPConnection(LOCALHOST, self.port) + #h.debuglevel = 1 + h.request("GET", "/", headers={"Accept": "text/plain"}) + conns.append(h) + # If you uncomment the next line, you can raise the + # number of connections much higher without running + # into delays. + #sleep(0.01) + responses = [] + for h in conns: + response = h.getresponse() + self.failUnlessEqual(response.status, 200) + responses.append(response) + for response in responses: + response.read() + + def testThreading(self): + # Ensures the correct number of threads keep running. + for n in range(4): + td.addTask(SleepingTask()) + # Try to confuse the task manager. + td.setThreadCount(2) + td.setThreadCount(1) + sleep(0.5) + # There should be 1 still running. + self.failUnlessEqual(len(td.threads), 1) + + def testChunkingRequestWithoutContent(self): + h = HTTPConnection(LOCALHOST, self.port) + h.request("GET", "/", headers={"Accept": "text/plain", + "Transfer-Encoding": "chunked"}) + h.send("0\r\n\r\n") + response = h.getresponse() + self.failUnlessEqual(int(response.status), 200) + response_body = response.read() + self.failUnlessEqual(response_body, '') + + def testChunkingRequestWithContent(self): + control_line="20;\r\n" # 20 hex = 32 dec + s = 'This string has 32 characters.\r\n' + expect = s * 12 + + h = HTTPConnection(LOCALHOST, self.port) + h.request("GET", "/", headers={"Accept": "text/plain", + "Transfer-Encoding": "chunked"}) + for n in range(12): + h.send(control_line) + h.send(s) + h.send("0\r\n\r\n") + response = h.getresponse() + self.failUnlessEqual(int(response.status), 200) + response_body = response.read() + self.failUnlessEqual(response_body, expect) + + +def test_suite(): + loader = unittest.TestLoader() + return loader.loadTestsFromTestCase(Tests) + +if __name__=='__main__': + unittest.TextTestRunner().run(test_suite()) diff --git a/http/tests/test_publisherserver.py b/http/tests/test_publisherserver.py new file mode 100644 index 0000000..62de507 --- /dev/null +++ b/http/tests/test_publisherserver.py @@ -0,0 +1,192 @@ +############################################################################## +# +# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +############################################################################## +"""Test Puvlisher-based HTTP Server + +$Id$ +""" +import unittest +from asyncore import socket_map, poll +from threading import Thread + +from zope.server.taskthreads import ThreadedTaskDispatcher +from zope.server.http.publisherhttpserver import PublisherHTTPServer + +from zope.component.tests.placelesssetup import PlacelessSetup +import zope.component + +from zope.i18n.interfaces import IUserPreferredCharsets + +from zope.publisher.http import IHTTPRequest +from zope.publisher.http import HTTPCharsets +from zope.publisher.browser import BrowserRequest +from zope.publisher.base import DefaultPublication +from zope.publisher.interfaces import Redirect, Retry +from zope.publisher.http import HTTPRequest + +from httplib import HTTPConnection + +from time import sleep + +td = ThreadedTaskDispatcher() + +LOCALHOST = '127.0.0.1' + +HTTPRequest.STAGGER_RETRIES = 0 # Don't pause. + + +class Conflict(Exception): + """ + Pseudo ZODB conflict error. + """ + + +class PublicationWithConflict(DefaultPublication): + + def handleException(self, object, request, exc_info, retry_allowed=1): + if exc_info[0] is Conflict and retry_allowed: + # This simulates a ZODB retry. + raise Retry(exc_info) + else: + DefaultPublication.handleException(self, object, request, exc_info, + retry_allowed) + + +class tested_object(object): + """Docstring required by publisher.""" + tries = 0 + + def __call__(self, REQUEST): + return 'URL invoked: %s' % REQUEST.URL + + def redirect_method(self, REQUEST): + "Generates a redirect using the redirect() method." + REQUEST.response.redirect("http://somewhere.com/redirect") + + def redirect_exception(self): + "Generates a redirect using an exception." + raise Redirect("http://somewhere.com/exception") + + def conflict(self, REQUEST, wait_tries): + """ + Returns 202 status only after (wait_tries) tries. + """ + if self.tries >= int(wait_tries): + raise "Accepted" + else: + self.tries += 1 + raise Conflict + + +class Tests(PlacelessSetup, unittest.TestCase): + + def setUp(self): + super(Tests, self).setUp() + as = zope.component.getService('Adapters') + as.register([IHTTPRequest], IUserPreferredCharsets, '', HTTPCharsets) + obj = tested_object() + obj.folder = tested_object() + obj.folder.item = tested_object() + + obj._protected = tested_object() + + pub = PublicationWithConflict(obj) + + def request_factory(input_stream, output_steam, env): + request = BrowserRequest(input_stream, output_steam, env) + request.setPublication(pub) + return request + + td.setThreadCount(4) + # Bind to any port on localhost. + self.server = PublisherHTTPServer(request_factory, 'Browser', + LOCALHOST, 0, task_dispatcher=td) + self.port = self.server.socket.getsockname()[1] + self.run_loop = 1 + self.thread = Thread(target=self.loop) + self.thread.start() + sleep(0.1) # Give the thread some time to start. + + def tearDown(self): + self.run_loop = 0 + self.thread.join() + td.shutdown() + self.server.close() + + def loop(self): + while self.run_loop: + poll(0.1, socket_map) + + def testResponse(self, path='/', status_expected=200, + add_headers=None, request_body=''): + h = HTTPConnection(LOCALHOST, self.port) + h.putrequest('GET', path) + h.putheader('Accept', 'text/plain') + if add_headers: + for k, v in add_headers.items(): + h.putheader(k, v) + if request_body: + h.putheader('Content-Length', str(int(len(request_body)))) + h.endheaders() + if request_body: + h.send(request_body) + response = h.getresponse() + length = int(response.getheader('Content-Length', '0')) + if length: + response_body = response.read(length) + else: + response_body = '' + + # Please do not disable the status code check. It must work. + self.failUnlessEqual(int(response.status), status_expected) + + self.failUnlessEqual(length, len(response_body)) + + if (status_expected == 200): + if path == '/': path = '' + expect_response = 'URL invoked: http://%s:%d%s' % (LOCALHOST, + self.port, path) + self.failUnlessEqual(response_body, expect_response) + + def testDeeperPath(self): + self.testResponse(path='/folder/item') + + def testNotFound(self): + self.testResponse(path='/foo/bar', status_expected=404) + + def testUnauthorized(self): + self.testResponse(path='/_protected', status_expected=401) + + def testRedirectMethod(self): + self.testResponse(path='/redirect_method', status_expected=303) + + def testRedirectException(self): + self.testResponse(path='/redirect_exception', status_expected=303) + self.testResponse(path='/folder/redirect_exception', + status_expected=303) + + def testConflictRetry(self): + # Expect the "Accepted" response since the retries will succeed. + self.testResponse(path='/conflict?wait_tries=2', status_expected=202) + + def testFailedConflictRetry(self): + # Expect a "Conflict" response since there will be too many + # conflicts. + self.testResponse(path='/conflict?wait_tries=10', status_expected=409) + + + +def test_suite(): + loader = unittest.TestLoader() + return loader.loadTestsFromTestCase(Tests) + +if __name__=='__main__': + unittest.TextTestRunner().run(test_suite()) diff --git a/linereceiver/linecommandparser.py b/linereceiver/linecommandparser.py new file mode 100644 index 0000000..4ed732b --- /dev/null +++ b/linereceiver/linecommandparser.py @@ -0,0 +1,70 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Line Command Parser + +$Id$ +""" +from zope.server.interfaces import IStreamConsumer +from zope.interface import implements + + +class LineCommandParser(object): + """Line Command parser. Arguments are left alone for now.""" + + implements(IStreamConsumer) + + # See IStreamConsumer + completed = 0 + inbuf = '' + cmd = '' + args = '' + empty = 0 + + max_line_length = 1024 # Not a hard limit + + + def __init__(self, adj): + """ + adj is an Adjustments object. + """ + self.adj = adj + + + def received(self, data): + 'See IStreamConsumer' + if self.completed: + return 0 # Can't consume any more. + pos = data.find('\n') + datalen = len(data) + if pos < 0: + self.inbuf = self.inbuf + data + if len(self.inbuf) > self.max_line_length: + # Don't accept any more. + self.completed = 1 + return datalen + else: + # Line finished. + s = data[:pos + 1] + self.inbuf = self.inbuf + s + self.completed = 1 + line = self.inbuf.strip() + self.parseLine(line) + return len(s) + + def parseLine(self, line): + parts = line.split(' ', 1) + if len(parts) == 2: + self.cmd, self.args = parts + else: + self.cmd = parts[0] diff --git a/linereceiver/linetask.py b/linereceiver/linetask.py new file mode 100644 index 0000000..b15f9f0 --- /dev/null +++ b/linereceiver/linetask.py @@ -0,0 +1,69 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Line Task + +$Id$ +""" +import socket +import time +from zope.server.interfaces import ITask +from zope.interface import implements + + +class LineTask(object): + """This is a generic task that can be used with command line + protocols to handle commands in a separate thread. + """ + implements(ITask) + + def __init__(self, channel, command, m_name): + self.channel = channel + self.m_name = m_name + self.args = command.args + + self.close_on_finish = 0 + + def service(self): + """Called to execute the task. + """ + try: + try: + self.start() + getattr(self.channel, self.m_name)(self.args) + self.finish() + except socket.error: + self.close_on_finish = 1 + if self.channel.adj.log_socket_errors: + raise + except: + self.channel.exception() + finally: + self.channel.end_task(self.close_on_finish) + + def cancel(self): + 'See ITask' + self.channel.close_when_done() + + def defer(self): + 'See ITask' + pass + + def start(self): + now = time.time() + self.start_time = now + + def finish(self): + hit_log = self.channel.server.hit_log + if hit_log is not None: + hit_log.log(self) diff --git a/logger/filelogger.py b/logger/filelogger.py new file mode 100644 index 0000000..5f742a2 --- /dev/null +++ b/logger/filelogger.py @@ -0,0 +1,71 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""File Logger + +$Id$ +""" +from types import StringType + +from zope.server.interfaces.logger import IMessageLogger +from zope.interface import implements + +class FileLogger(object): + """Simple File Logger + """ + + implements(IMessageLogger) + + def __init__(self, file, flush=1, mode='a'): + """pass this either a path or a file object.""" + if type(file) is StringType: + if (file == '-'): + import sys + self.file = sys.stdout + else: + self.file = open(file, mode) + else: + self.file = file + self.do_flush = flush + + def __repr__(self): + return '' % self.file + + def write(self, data): + self.file.write(data) + self.maybe_flush() + + def writeline(self, line): + self.file.writeline(line) + self.maybe_flush() + + def writelines(self, lines): + self.file.writelines(lines) + self.maybe_flush() + + def maybe_flush(self): + if self.do_flush: + self.file.flush() + + def flush(self): + self.file.flush() + + def softspace(self, *args): + pass + + def logMessage(self, message): + 'See IMessageLogger' + if message[-1] not in ('\r', '\n'): + self.write(message + '\n') + else: + self.write(message) diff --git a/logger/m_syslog.py b/logger/m_syslog.py new file mode 100644 index 0000000..92c8a36 --- /dev/null +++ b/logger/m_syslog.py @@ -0,0 +1,177 @@ +# -*- Mode: Python; tab-width: 4 -*- + +# ====================================================================== +# Copyright 1997 by Sam Rushing +# +# All Rights Reserved +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose and without fee is hereby +# granted, provided that the above copyright notice appear in all +# copies and that both that copyright notice and this permission +# notice appear in supporting documentation, and that the name of Sam +# Rushing not be used in advertising or publicity pertaining to +# distribution of the software without specific, written prior +# permission. +# +# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, +# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN +# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR +# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS +# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, +# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +# ====================================================================== + +"""socket interface to unix syslog. +On Unix, there are usually two ways of getting to syslog: via a +local unix-domain socket, or via the TCP service. + +Usually "/dev/log" is the unix domain socket. This may be different +for other systems. + +>>> my_client = syslog_client ('/dev/log') + +Otherwise, just use the UDP version, port 514. + +>>> my_client = syslog_client (('my_log_host', 514)) + +On win32, you will have to use the UDP version. Note that +you can use this to log to other hosts (and indeed, multiple +hosts). + +This module is not a drop-in replacement for the python + extension module - the interface is different. + +Usage: + +>>> c = syslog_client() +>>> c = syslog_client ('/strange/non_standard_log_location') +>>> c = syslog_client (('other_host.com', 514)) +>>> c.log ('testing', facility='local0', priority='debug') + +""" + +# TODO: support named-pipe syslog. +# [see ftp://sunsite.unc.edu/pub/Linux/system/Daemons/syslog-fifo.tar.z] + +# from : +# =========================================================================== +# priorities/facilities are encoded into a single 32-bit quantity, where the +# bottom 3 bits are the priority (0-7) and the top 28 bits are the facility +# (0-big number). Both the priorities and the facilities map roughly +# one-to-one to strings in the syslogd(8) source code. This mapping is +# included in this file. +# +# priorities (these are ordered) + +LOG_EMERG = 0 # system is unusable +LOG_ALERT = 1 # action must be taken immediately +LOG_CRIT = 2 # critical conditions +LOG_ERR = 3 # error conditions +LOG_WARNING = 4 # warning conditions +LOG_NOTICE = 5 # normal but significant condition +LOG_INFO = 6 # informational +LOG_DEBUG = 7 # debug-level messages + +# facility codes +LOG_KERN = 0 # kernel messages +LOG_USER = 1 # random user-level messages +LOG_MAIL = 2 # mail system +LOG_DAEMON = 3 # system daemons +LOG_AUTH = 4 # security/authorization messages +LOG_SYSLOG = 5 # messages generated internally by syslogd +LOG_LPR = 6 # line printer subsystem +LOG_NEWS = 7 # network news subsystem +LOG_UUCP = 8 # UUCP subsystem +LOG_CRON = 9 # clock daemon +LOG_AUTHPRIV = 10 # security/authorization messages (private) + +# other codes through 15 reserved for system use +LOG_LOCAL0 = 16 # reserved for local use +LOG_LOCAL1 = 17 # reserved for local use +LOG_LOCAL2 = 18 # reserved for local use +LOG_LOCAL3 = 19 # reserved for local use +LOG_LOCAL4 = 20 # reserved for local use +LOG_LOCAL5 = 21 # reserved for local use +LOG_LOCAL6 = 22 # reserved for local use +LOG_LOCAL7 = 23 # reserved for local use + +priority_names = { + "alert": LOG_ALERT, + "crit": LOG_CRIT, + "debug": LOG_DEBUG, + "emerg": LOG_EMERG, + "err": LOG_ERR, + "error": LOG_ERR, # DEPRECATED + "info": LOG_INFO, + "notice": LOG_NOTICE, + "panic": LOG_EMERG, # DEPRECATED + "warn": LOG_WARNING, # DEPRECATED + "warning": LOG_WARNING, + } + +facility_names = { + "auth": LOG_AUTH, + "authpriv": LOG_AUTHPRIV, + "cron": LOG_CRON, + "daemon": LOG_DAEMON, + "kern": LOG_KERN, + "lpr": LOG_LPR, + "mail": LOG_MAIL, + "news": LOG_NEWS, + "security": LOG_AUTH, # DEPRECATED + "syslog": LOG_SYSLOG, + "user": LOG_USER, + "uucp": LOG_UUCP, + "local0": LOG_LOCAL0, + "local1": LOG_LOCAL1, + "local2": LOG_LOCAL2, + "local3": LOG_LOCAL3, + "local4": LOG_LOCAL4, + "local5": LOG_LOCAL5, + "local6": LOG_LOCAL6, + "local7": LOG_LOCAL7, + } + +import socket + +class syslog_client(object): + + def __init__(self, address='/dev/log'): + self.address = address + if type(address) == type(''): + try: # APUE 13.4.2 specifes /dev/log as datagram socket + self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self.socket.connect(address) + except: # older linux may create as stream socket + self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.socket.connect(address) + self.unix = 1 + else: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.unix = 0 + + + log_format_string = '<%d>%s\000' + + def log(self, message, facility=LOG_USER, priority=LOG_INFO): + message = self.log_format_string % ( + self.encode_priority(facility, priority), + message + ) + if self.unix: + self.socket.send(message) + else: + self.socket.sendto(message, self.address) + + def encode_priority(self, facility, priority): + if type(facility) == type(''): + facility = facility_names[facility] + if type(priority) == type(''): + priority = priority_names[priority] + return (facility<<3) | priority + + def close(self): + if self.unix: + self.socket.close() diff --git a/logger/pythonlogger.py b/logger/pythonlogger.py index a924d5b..80e190f 100644 --- a/logger/pythonlogger.py +++ b/logger/pythonlogger.py @@ -20,7 +20,7 @@ from zope.server.interfaces.logger import IMessageLogger from zope.interface import implements -class PythonLogger: +class PythonLogger(object): """Proxy for Python's logging module""" implements(IMessageLogger) diff --git a/logger/resolvinglogger.py b/logger/resolvinglogger.py new file mode 100644 index 0000000..8ed1e05 --- /dev/null +++ b/logger/resolvinglogger.py @@ -0,0 +1,52 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Resolving Logger + +$Id$ +""" +from zope.server.interfaces.logger import IRequestLogger +from zope.interface import implements + + +class ResolvingLogger(object): + """Feed (ip, message) combinations into this logger to get a + resolved hostname in front of the message. The message will not + be logged until the PTR request finishes (or fails).""" + + implements(IRequestLogger) + + def __init__(self, resolver, logger): + self.resolver = resolver + # logger is an IMessageLogger + self.logger = logger + + class logger_thunk(object): + def __init__(self, message, logger): + self.message = message + self.logger = logger + + def __call__(self, host, ttl, answer): + if not answer: + answer = host + self.logger.logMessage('%s%s' % (answer, self.message)) + + def logRequest(self, ip, message): + 'See IRequestLogger' + self.resolver.resolve_ptr( + ip, + self.logger_thunk( + message, + self.logger + ) + ) diff --git a/logger/taillogger.py b/logger/taillogger.py new file mode 100644 index 0000000..6d7a688 --- /dev/null +++ b/logger/taillogger.py @@ -0,0 +1,42 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Tail Logger + +$Id$ +""" +from zope.server.interfaces.logger import IMessageLogger +from zope.interface import implements + +class TailLogger(object): + """Keep track of the last log messages""" + + implements(IMessageLogger) + + def __init__(self, logger, size=500): + self.size = size + self.logger = logger + self.messages = [] + + def logMessage(self, message): + 'See IMessageLogger' + self.messages.append(strip_eol(message)) + if len(self.messages) > self.size: + del self.messages[0] + self.logger.logMessage(message) + + +def strip_eol(line): + while line and line[-1] in '\r\n': + line = line[:-1] + return line diff --git a/logger/unresolvinglogger.py b/logger/unresolvinglogger.py new file mode 100644 index 0000000..992254b --- /dev/null +++ b/logger/unresolvinglogger.py @@ -0,0 +1,31 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Unresolving Logger + +$Id$ +""" +from zope.server.interfaces.logger import IRequestLogger +from zope.interface import implements + +class UnresolvingLogger(object): + """Just in case you don't want to resolve""" + + implements(IRequestLogger) + + def __init__(self, logger): + self.logger = logger + + def logRequest(self, ip, message): + 'See IRequestLogger' + self.logger.logMessage('%s%s' % (ip, message)) diff --git a/taskthreads.py b/taskthreads.py new file mode 100644 index 0000000..3c52d9c --- /dev/null +++ b/taskthreads.py @@ -0,0 +1,122 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Threaded Task Dispatcher + +$Id$ +""" +from Queue import Queue, Empty +from thread import allocate_lock, start_new_thread +from time import time, sleep +import logging + +from zope.server.interfaces import ITaskDispatcher +from zope.interface import implements + + +class ThreadedTaskDispatcher(object): + """A Task Dispatcher that creates a thread for each task.""" + + implements(ITaskDispatcher) + + stop_count = 0 # Number of threads that will stop soon. + + def __init__(self): + self.threads = {} # { thread number -> 1 } + self.queue = Queue() + self.thread_mgmt_lock = allocate_lock() + + def handlerThread(self, thread_no): + threads = self.threads + try: + while threads.get(thread_no): + task = self.queue.get() + if task is None: + # Special value: kill this thread. + break + try: + task.service() + except: + logging.exception('Exception during task') + finally: + mlock = self.thread_mgmt_lock + mlock.acquire() + try: + self.stop_count -= 1 + try: del threads[thread_no] + except KeyError: pass + finally: + mlock.release() + + def setThreadCount(self, count): + """See zope.server.interfaces.ITaskDispatcher""" + mlock = self.thread_mgmt_lock + mlock.acquire() + try: + threads = self.threads + thread_no = 0 + running = len(threads) - self.stop_count + while running < count: + # Start threads. + while thread_no in threads: + thread_no = thread_no + 1 + threads[thread_no] = 1 + running += 1 + start_new_thread(self.handlerThread, (thread_no,)) + thread_no = thread_no + 1 + if running > count: + # Stop threads. + to_stop = running - count + self.stop_count += to_stop + for n in range(to_stop): + self.queue.put(None) + running -= 1 + finally: + mlock.release() + + def addTask(self, task): + """See zope.server.interfaces.ITaskDispatcher""" + if task is None: + raise ValueError, "No task passed to addTask()." + # assert ITask.providedBy(task) + try: + task.defer() + self.queue.put(task) + except: + task.cancel() + raise + + def shutdown(self, cancel_pending=True, timeout=5): + """See zope.server.interfaces.ITaskDispatcher""" + self.setThreadCount(0) + # Ensure the threads shut down. + threads = self.threads + expiration = time() + timeout + while threads: + if time() >= expiration: + logging.error("%d thread(s) still running" % len(threads)) + sleep(0.1) + if cancel_pending: + # Cancel remaining tasks. + try: + queue = self.queue + while not queue.empty(): + task = queue.get() + if task is not None: + task.cancel() + except Empty: + pass + + def getPendingTasksEstimate(self): + """See zope.server.interfaces.ITaskDispatcher""" + return self.queue.qsize()