Skip to content

Commit

Permalink
Partial python 3 support
Browse files Browse the repository at this point in the history
  • Loading branch information
kedder committed Feb 25, 2013
1 parent aeb1351 commit 70965b7
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 35 deletions.
6 changes: 3 additions & 3 deletions setup.py
Expand Up @@ -54,12 +54,13 @@
'zope.securitypolicy',
'zope.testing',
'zope.traversing',
]),
],
testbrowser=['zope.testbrowser[wsgi] >= 4.0.0']),
install_requires=[
'setuptools',
'WebTest',
'ZConfig',
'ZODB3',
'ZODB',
'zope.app.appsetup >= 3.14',
'zope.processlifetime',
'zope.app.publication',
Expand All @@ -75,7 +76,6 @@
'zope.processlifetime',
'zope.session',
'zope.site',
'zope.testbrowser[wsgi] >= 4.0.0',
'zope.testing',
'zope.traversing',
],
Expand Down
14 changes: 8 additions & 6 deletions src/zope/app/wsgi/__init__.py
Expand Up @@ -15,6 +15,8 @@
$Id$
"""
from __future__ import print_function

import os
import sys
import logging
Expand All @@ -23,7 +25,7 @@
import zope.app.appsetup.product

from zope.event import notify
from zope.interface import implements
from zope.interface import implementer
from zope.publisher.publish import publish
from zope.publisher.interfaces.logginginfo import ILoggingInfo

Expand All @@ -32,14 +34,14 @@
from zope.app.wsgi import interfaces


@implementer(interfaces.IWSGIApplication)
class WSGIPublisherApplication(object):
"""A WSGI application implementation for the zope publisher
Instances of this class can be used as a WSGI application object.
The class relies on a properly initialized request factory.
"""
implements(interfaces.IWSGIApplication)

def __init__(self, db=None, factory=HTTPPublicationRequestFactory,
handle_errors=True):
Expand Down Expand Up @@ -89,11 +91,11 @@ def __call__(self, environ, start_response):
try:
app = super(PMDBWSGIPublisherApplication, self)
return app.__call__(environ, start_response)
except Exception, error:
except Exception as error:
import sys
import pdb
print "%s:" % sys.exc_info()[0]
print sys.exc_info()[1]
print("%s:" % sys.exc_info()[0])
print(sys.exc_info()[1])
try:
pdb.post_mortem(sys.exc_info()[2])
raise
Expand All @@ -120,7 +122,7 @@ def config(configfile, schemafile=None, features=()):
options, handlers = ZConfig.loadConfig(schema, configfile)
else:
options, handlers = ZConfig.loadConfigFile(schema, configfile)
except ZConfig.ConfigurationError, msg:
except ZConfig.ConfigurationError as msg:
sys.stderr.write("Error: %s\n" % str(msg))
sys.exit(2)

Expand Down
33 changes: 33 additions & 0 deletions src/zope/app/wsgi/_compat.py
@@ -0,0 +1,33 @@
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Python-3 Compatibility module
"""

import sys
PYTHON2 = sys.version_info[0] == 2
PYTHON3 = sys.version_info[0] == 3

if PYTHON2:
_u = unicode
import xmlrpclib as xmlrpcclient
import httplib as httpclient
import types
FileType = types.FileType
else:
_u = str
import xmlrpc.client as xmlrpcclient
import http.client as httpclient
from io import IOBase
FileType = IOBase

8 changes: 5 additions & 3 deletions src/zope/app/wsgi/fileresult.py
Expand Up @@ -23,9 +23,11 @@
from zope.publisher.interfaces.http import IResult
from zope.security.proxy import removeSecurityProxy

from zope.app.wsgi._compat import FileType


@interface.implementer(IResult)
class FallbackWrapper:
interface.implements(IResult)

def __init__(self, f):
self.close = f.close
Expand All @@ -41,7 +43,7 @@ def __iter__(self):
break


@component.adapter(file, zope.publisher.interfaces.http.IHTTPRequest)
@component.adapter(FileType, zope.publisher.interfaces.http.IHTTPRequest)
@interface.implementer(zope.publisher.http.IResult)
def FileResult(f, request):
f = removeSecurityProxy(f)
Expand All @@ -65,7 +67,7 @@ def FileResult(f, request):
_tfile = tempfile.TemporaryFile()
_tfile.close()
_tfile = _tfile.__class__
if _tfile is file:
if issubclass(_tfile, FileType):
# need a fake one. Sigh
class _tfile:
pass
Expand Down
2 changes: 1 addition & 1 deletion src/zope/app/wsgi/interfaces.py
Expand Up @@ -81,8 +81,8 @@ class IWSGIPublisherApplicationCreatedEvent(zope.interface.Interface):
application = zope.interface.Attribute("The WSGI application.")


@zope.interface.implementer(IWSGIPublisherApplicationCreatedEvent)
class WSGIPublisherApplicationCreated(object):
zope.interface.implements(IWSGIPublisherApplicationCreatedEvent)

def __init__(self, application):
self.application = application
144 changes: 122 additions & 22 deletions src/zope/app/wsgi/testlayer.py
Expand Up @@ -11,18 +11,20 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from StringIO import StringIO
import httplib
import xmlrpclib
from io import BytesIO
import base64
import re

import transaction
from zope.app.appsetup.testlayer import ZODBLayer
from zope.app.wsgi import WSGIPublisherApplication
import zope.testbrowser.wsgi
#import zope.testbrowser.wsgi
from webtest import TestRequest

# BBB
from zope.testbrowser.wsgi import Browser
#from zope.testbrowser.wsgi import Browser

from zope.app.wsgi._compat import httpclient, xmlrpcclient


class TransactionMiddleware(object):
Expand All @@ -46,7 +48,100 @@ def __call__(self, environ, start_response):
self.root_factory()._p_jar.sync()


class BrowserLayer(zope.testbrowser.wsgi.Layer, ZODBLayer):
basicre = re.compile('Basic (.+)?:(.+)?$')


def auth_header(header):
"""This function takes an authorization HTTP header and encode the
couple user, password into base 64 like the HTTP protocol wants
it.
"""
match = basicre.match(header)
if match:
u, p = match.group(1, 2)
if u is None:
u = ''
if p is None:
p = ''
plain = '%s:%s' % (u, p)
auth = base64.encodestring(plain.encode('utf-8'))
return 'Basic %s' % str(auth.rstrip().decode('latin1'))
return header


def is_wanted_header(header):
"""Return True if the given HTTP header key is wanted.
"""
key, value = header
return key.lower() not in ('x-content-type-warning', 'x-powered-by')


class AuthorizationMiddleware(object):
"""This middleware makes the WSGI application compatible with the
HTTPCaller behavior defined in zope.app.testing.functional:
- It modifies the HTTP Authorization header to encode user and
password into base64 if it is Basic authentication.
"""

def __init__(self, wsgi_stack):
self.wsgi_stack = wsgi_stack

def __call__(self, environ, start_response):
# Handle authorization
auth_key = 'HTTP_AUTHORIZATION'
if auth_key in environ:
environ[auth_key] = auth_header(environ[auth_key])

# Remove unwanted headers
def application_start_response(status, headers, exc_info=None):
headers = filter(is_wanted_header, headers)
start_response(status, headers)

for entry in self.wsgi_stack(environ, application_start_response):
yield entry


_APP_UNDER_TEST = None # setup and torn down by the Layer class

# This is a copy of zope.testbrowser.wsgi.Layer, made to drop dependency on
# zope.testbrowser, that, in turn depends on non python-3 compatible mechanize.
class WSGILayer(object):
"""Test layer which sets up WSGI application for use with
WebTest/testbrowser.
"""

__bases__ = ()
__name__ = 'Layer'

@classmethod
def get_app(cls):
return _APP_UNDER_TEST

def make_wsgi_app(self):
# Override this method in subclasses of this layer in order to set up
# the WSGI application.
raise NotImplementedError

def cooperative_super(self, method_name):
# Calling `super` for multiple inheritance:
method = getattr(super(WSGILayer, self), method_name, None)
if method is not None:
method()

def setUp(self):
self.cooperative_super('setUp')
global _APP_UNDER_TEST
if _APP_UNDER_TEST is not None:
raise AssertionError("Already Setup")
_APP_UNDER_TEST = self.make_wsgi_app()

def tearDown(self):
global _APP_UNDER_TEST
_APP_UNDER_TEST = None
self.cooperative_super('tearDown')

class BrowserLayer(WSGILayer, ZODBLayer):
"""This create a test layer with a test database and register a wsgi
application to use that test database.
Expand All @@ -66,7 +161,7 @@ def make_wsgi_app(self):
# off of that in testSetUp()
fake_db = object()
self._application = WSGIPublisherApplication(fake_db)
return zope.testbrowser.wsgi.AuthorizationMiddleware(
return AuthorizationMiddleware(
TransactionMiddleware(
self.getRootFolder,
self.setup_middleware(self._application)))
Expand Down Expand Up @@ -102,27 +197,32 @@ def getHeader(self, name, default=None):
return self.response.headers.get(name, default)

def getHeaders(self):
return self.response.headerlist
return sorted(self.response.headerlist)

def getBody(self):
return self.response.body

def getOutput(self):
parts = ['HTTP/1.0 ' + self.response.status]
parts += map('%s: %s'.__mod__, self.response.headerlist)
if self.response.body:
parts += ['', self.response.body]
return '\n'.join(parts)
parts = [b'HTTP/1.0 ' + self.response.status.encode('latin1')]
parts += [('%s: %s' % h).encode('latin1') for h in self.getHeaders()]

__str__ = getOutput
body = self.response.body
if body:
if not isinstance(body, bytes):
body = body.encode('utf-8')
parts += [b'', body]
return b'\n'.join(parts)

def __str__(self):
out = self.getOutput()
return out.decode('latin1')

def http(string, handle_errors=True):
app = zope.testbrowser.wsgi.Layer.get_app()
app = WSGILayer.get_app()
if app is None:
raise NotInBrowserLayer(NotInBrowserLayer.__doc__)

request = TestRequest.from_file(StringIO(string))
request = TestRequest.from_file(BytesIO(string))
request.environ['wsgi.handleErrors'] = handle_errors
response = request.get_response(app)
return FakeResponse(response)
Expand All @@ -134,11 +234,11 @@ def __init__(self, data):
self.data = data

def makefile(self, mode, bufsize=None):
return StringIO(self.data)
return BytesIO(self.data)


class XMLRPCTestTransport(xmlrpclib.Transport):
"""xmlrpclib transport that delegates to http().
class XMLRPCTestTransport(xmlrpcclient.Transport):
"""xmlrpc.client lib transport that delegates to http().
It can be used like a normal transport, including support for basic
authentication.
Expand Down Expand Up @@ -167,13 +267,13 @@ def request(self, host, handler, request_body, verbose=0):
headers = response.getHeaders()

if errcode != 200:
raise xmlrpclib.ProtocolError(
raise xmlrpcclient.ProtocolError(
host + handler,
errcode, errmsg,
headers
)

res = httplib.HTTPResponse(FakeSocket(response.getBody()))
res = httpclient.HTTPResponse(FakeSocket(response.getBody()))
res.begin()
return self.parse_response(res)

Expand All @@ -188,4 +288,4 @@ def XMLRPCServerProxy(uri, transport=None, encoding=None,
transport = XMLRPCTestTransport()
if isinstance(transport, XMLRPCTestTransport):
transport.handleErrors = handleErrors
return xmlrpclib.ServerProxy(uri, transport, encoding, verbose, allow_none)
return xmlrpcclient.ServerProxy(uri, transport, encoding, verbose, allow_none)

0 comments on commit 70965b7

Please sign in to comment.