Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Implement xmlrpc options, such as allow_none

Implement allow_none, use_datetime, and charset.
An includeme method has been added to configure those params
via the ini file.
  • Loading branch information...
commit d3bbc3b206beeb2811f5301d22bb1bc3e5605359 1 parent f5c20ab
Guillaume Gauvrit authored
Showing with 150 additions and 13 deletions.
  1. +25 −9 pyramid_xmlrpc/__init__.py
  2. +125 −4 pyramid_xmlrpc/tests.py
View
34 pyramid_xmlrpc/__init__.py
@@ -1,31 +1,35 @@
import xmlrpclib
import webob
+from pyramid.settings import asbool
-def xmlrpc_marshal(data):
+
+def xmlrpc_marshal(data, allow_none=False, encoding=None):
""" Marshal a Python data structure into an XML document suitable
for use as an XML-RPC response and return the document. If
``data`` is an ``xmlrpclib.Fault`` instance, it will be marshalled
into a suitable XML-RPC fault response."""
if isinstance(data, xmlrpclib.Fault):
- return xmlrpclib.dumps(data)
+ return xmlrpclib.dumps(data, allow_none=allow_none, encoding=encoding)
else:
- return xmlrpclib.dumps((data,), methodresponse=True)
+ return xmlrpclib.dumps((data,), methodresponse=True,
+ allow_none=allow_none,
+ encoding=encoding)
-def xmlrpc_response(data):
+def xmlrpc_response(data, allow_none=False, encoding=None):
""" Marshal a Python data structure into a webob ``Response``
object with a body that is an XML document suitable for use as an
XML-RPC response with a content-type of ``text/xml`` and return
the response."""
- xml = xmlrpc_marshal(data)
+ xml = xmlrpc_marshal(data, allow_none=allow_none, encoding=encoding)
response = webob.Response(xml)
response.content_type = 'text/xml'
response.content_length = len(xml)
return response
-def parse_xmlrpc_request(request):
+def parse_xmlrpc_request(request, use_datetime=False):
""" Deserialize the body of a request from an XML-RPC request
document into a set of params and return a two-tuple. The first
element in the tuple is the method params as a sequence, the
@@ -33,7 +37,7 @@ def parse_xmlrpc_request(request):
if request.content_length > (1 << 23):
# protect from DOS (> 8MB body)
raise ValueError('Body too large (%s bytes)' % request.content_length)
- params, method = xmlrpclib.loads(request.body)
+ params, method = xmlrpclib.loads(request.body, use_datetime)
return params, method
@@ -103,6 +107,9 @@ class XMLRPCView:
Subclass and add your methods as described in the documentation.
"""
+ allow_none = False
+ charset = None
+ use_datetime = False
def __init__(self, context, request):
self.context = context
@@ -120,5 +127,14 @@ def __call__(self):
want XML-RPC to continute to work!
"""
- params, method = parse_xmlrpc_request(self.request)
- return xmlrpc_response(getattr(self, method)(*params))
+ params, method = parse_xmlrpc_request(self.request, self.use_datetime)
+ return xmlrpc_response(getattr(self, method)(*params), self.allow_none,
+ self.charset)
+
+
+def includeme(config):
+ settings = config.registry.settings
+ XMLRPCView.allow_none = asbool(settings.get('xmlrpc.allow_none', False))
+ XMLRPCView.use_datetime = asbool(settings.get('xmlrpc.use_datetime',
+ False))
+ XMLRPCView.charset = settings.get('xmlrpc.charset')
View
129 pyramid_xmlrpc/tests.py
@@ -1,8 +1,10 @@
+# -*- coding: utf-8 -*-
import unittest
from pyramid import testing
class TestXMLRPCMarshal(unittest.TestCase):
+
def _callFUT(self, value):
from pyramid_xmlrpc import xmlrpc_marshal
return xmlrpc_marshal(value)
@@ -22,9 +24,10 @@ def test_xmlrpc_marshal_fault(self):
class TestXMLRPResponse(unittest.TestCase):
- def _callFUT(self, value):
+
+ def _callFUT(self, value, allow_none=False, charset=None):
from pyramid_xmlrpc import xmlrpc_response
- return xmlrpc_response(value)
+ return xmlrpc_response(value, allow_none, charset)
def test_xmlrpc_response(self):
import xmlrpclib
@@ -36,11 +39,28 @@ def test_xmlrpc_response(self):
self.assertEqual(response.content_length, len(response.body))
self.assertEqual(response.status, '200 OK')
+ def test_xmlrpc_response_nil(self):
+ import xmlrpclib
+ data = None
+ self.assertRaises(TypeError, self._callFUT, data)
+ response = self._callFUT(data, allow_none=True).body
+ self.assertIsNone(xmlrpclib.loads(response)[0][0])
+
+ def test_xmlrpc_response_charset(self):
+ import xmlrpclib
+ data = u"é"
+ self.assertRaises(UnicodeEncodeError, self._callFUT, data, False,
+ "us-ascii")
+ response = self._callFUT(data, charset="iso-8859-1").body
+ self.assertEqual(response.split('>', 1)[0],
+ "<?xml version='1.0' encoding='iso-8859-1'?")
+
class TestParseXMLRPCRequest(unittest.TestCase):
- def _callFUT(self, request):
+
+ def _callFUT(self, request, use_datetime=0):
from pyramid_xmlrpc import parse_xmlrpc_request
- return parse_xmlrpc_request(request)
+ return parse_xmlrpc_request(request, use_datetime)
def test_normal(self):
import xmlrpclib
@@ -58,8 +78,23 @@ def test_toobig(self):
request.content_length = 1 << 24
self.assertRaises(ValueError, self._callFUT, request)
+ def test_datetime(self):
+ import datetime
+ import xmlrpclib
+ from pyramid_xmlrpc import parse_xmlrpc_request
+ param = datetime.datetime.now()
+ packet = xmlrpclib.dumps((param,), methodname='__call__')
+ request = testing.DummyRequest()
+ request.body = packet
+ request.content_length = len(packet)
+ params, method = self._callFUT(request)
+ self.assertEqual(params[0].__class__, xmlrpclib.DateTime)
+ params, method = self._callFUT(request, use_datetime=True)
+ self.assertEqual(params[0].__class__, datetime.datetime)
+
class TestDecorator(unittest.TestCase):
+
def _callFUT(self, unwrapped):
from pyramid_xmlrpc import xmlrpc_view
return xmlrpc_view(unwrapped)
@@ -113,3 +148,89 @@ def a_method(self, param):
response = instance()
self.assertEqual(response.body, xmlrpclib.dumps((param,),
methodresponse=True))
+
+ def test_marshalling_none(self):
+ from pyramid_xmlrpc import XMLRPCView
+
+ class Test(XMLRPCView):
+ allow_none = True
+
+ def a_method(self, param):
+ return None
+
+ import xmlrpclib
+ packet = xmlrpclib.dumps((None,), methodname='a_method',
+ allow_none=True)
+ request = testing.DummyRequest()
+ request.body = packet
+ request.content_length = len(packet)
+
+ # instantiate the view
+ context = testing.DummyModel()
+ instance = Test(context, request)
+ # exercise it
+ response = instance()
+ self.assertEqual(response.body, xmlrpclib.dumps((None,),
+ allow_none=True,
+ methodresponse=True))
+
+ def test_parse_datetime(self):
+ from pyramid_xmlrpc import XMLRPCView
+
+ class Test(XMLRPCView):
+ use_datetime = True
+
+ def a_method(self, param):
+ Test.datetime = param
+ return param
+
+ import xmlrpclib
+ import datetime
+ packet = xmlrpclib.dumps((datetime.datetime.now(),),
+ methodname='a_method')
+ request = testing.DummyRequest()
+ request.body = packet
+ request.content_length = len(packet)
+
+ # instantiate the view
+ context = testing.DummyModel()
+ instance = Test(context, request)
+ # exercise it
+ response = instance()
+ self.assertEqual(Test.datetime.__class__, datetime.datetime)
+
+ def test_charset(self):
+ from pyramid_xmlrpc import XMLRPCView
+
+ class Test(XMLRPCView):
+ charset = 'iso-8859-1'
+
+ def a_method(self, param):
+ return param
+
+ import xmlrpclib
+ packet = xmlrpclib.dumps(('param',), methodname='a_method')
+ request = testing.DummyRequest()
+ request.body = packet
+ request.content_length = len(packet)
+
+ # instantiate the view
+ context = testing.DummyModel()
+ instance = Test(context, request)
+ # exercise it
+ response = instance()
+ self.assertEqual(response.body.split('>', 1)[0],
+ "<?xml version='1.0' encoding='iso-8859-1'?")
+
+
+class TestConfig(unittest.TestCase):
+
+ def test_includeme(self):
+ from pyramid_xmlrpc import includeme, XMLRPCView
+
+ settings = {'xmlrpc.charset': 'iso-8859-15',
+ 'xmlrpc.allow_none': 'true'}
+ self.config = testing.setUp(settings=settings)
+ self.config.include(includeme)
+ self.assertEqual(XMLRPCView.charset, 'iso-8859-15')
+ self.assertTrue(XMLRPCView.allow_none)
Please sign in to comment.
Something went wrong with that request. Please try again.