JSONRPC Library (jsonrpclib)
This library is a JSON-RPC v.2 (proposed) implementation which
follows the xmlrpclib API for portability between clients. It
uses the same Server / ServerProxy, loads, dumps, etc. syntax,
while providing features not present in XML-RPC like:
* Keyword arguments
* Notifications
* Versioning
* Batches and batch notifications
Eventually, I'll add a SimpleXMLRPCServer compatible library,
and other things to tie the thing off nicely. :)
For a quick-start, just open a console and type the following,
replacing the server address, method, and parameters
>>> import jsonrpclib
>>> server = jsonrpclib.Server('http://localhost:8181')
>>> server.add(5, 6)
>>> server._notify.add(5, 6)
>>> batch = jsonrpclib.MultiCall(server)
>>> batch.add(3, 50)
>>> batch.add(2, 3)
>>> batch._notify.add(3, 5)
>>> batch()
[53, 5]
See for more info.
import types
from xmlrpclib import Transport as XMLTransport
from xmlrpclib import SafeTransport as XMLSafeTransport
from xmlrpclib import ServerProxy as XMLServerProxy
from xmlrpclib import _Method as XML_Method
import string
import random
# Library includes
from jsonrpclib import config
from jsonrpclib import history
# JSON library importing
cjson = None
json = None
import cjson
except ImportError:
import json
except ImportError:
import simplejson as json
except ImportError:
raise ImportError(
'You must have the cjson, json, or simplejson ' +
'module(s) available.'
IDCHARS = string.ascii_lowercase+string.digits
class UnixSocketMissing(Exception):
Just a properly named Exception if Unix Sockets usage is
attempted on a platform that doesn't support them (Windows)
# JSON Abstractions
def jdumps(obj, encoding='utf-8'):
# Do 'serialize' test at some point for other classes
global cjson
if cjson:
return cjson.encode(obj)
return json.dumps(obj, encoding=encoding)
def jloads(json_string):
global cjson
if cjson:
return cjson.decode(json_string)
return json.loads(json_string)
# XMLRPClib re-implementations
class ProtocolError(Exception):
class TransportMixIn(object):
""" Just extends the XMLRPC transport where necessary. """
user_agent = config.user_agent
# for Python 2.7 support
_connection = (None, None)
_extra_headers = []
def send_content(self, connection, request_body):
connection.putheader("Content-Type", "application/json-rpc")
connection.putheader("Content-Length", str(len(request_body)))
if request_body:
def getparser(self):
target = JSONTarget()
return JSONParser(target), target
class JSONParser(object):
def __init__(self, target): = target
def feed(self, data):
def close(self):
class JSONTarget(object):
def __init__(self): = []
def feed(self, data):
def close(self):
return ''.join(
class Transport(TransportMixIn, XMLTransport):
def __init__(self):
class SafeTransport(TransportMixIn, XMLSafeTransport):
def __init__(self):
from httplib import HTTP, HTTPConnection
from socket import socket
from socket import AF_UNIX, SOCK_STREAM
except ImportError:
class UnixHTTPConnection(HTTPConnection):
def connect(self):
self.sock = socket(AF_UNIX, SOCK_STREAM)
class UnixHTTP(HTTP):
_connection_class = UnixHTTPConnection
class UnixTransport(TransportMixIn, XMLTransport):
def make_connection(self, host):
host, extra_headers, x509 = self.get_host_info(host)
return UnixHTTP(host)
class ServerProxy(XMLServerProxy):
Unfortunately, much more of this class has to be copied since
so much of it does the serialization.
def __init__(self, uri, transport=None, encoding=None,
verbose=0, version=None):
import urllib
if not version:
version = config.version
self.__version = version
schema, uri = urllib.splittype(uri)
if schema not in ('http', 'https', 'unix'):
raise IOError('Unsupported JSON-RPC protocol.')
if schema == 'unix':
# Don't like the "generic" Exception...
raise UnixSocketMissing("Unix sockets not available.")
self.__host = uri
self.__handler = '/'
self.__host, self.__handler = urllib.splithost(uri)
if not self.__handler:
# Not sure if this is in the JSON spec?
# self.__handler = '/'
self.__handler == '/'
if transport is None:
if schema == 'unix':
transport = UnixTransport()
elif schema == 'https':
transport = SafeTransport()
transport = Transport()
self.__transport = transport
self.__encoding = encoding
self.__verbose = verbose
def _request(self, methodname, params, rpcid=None):
request = dumps(params, methodname, encoding=self.__encoding,
rpcid=rpcid, version=self.__version)
response = self._run_request(request)
return response['result']
def _request_notify(self, methodname, params, rpcid=None):
request = dumps(params, methodname, encoding=self.__encoding,
rpcid=rpcid, version=self.__version, notify=True)
response = self._run_request(request, notify=True)
def _run_request(self, request, notify=None):
response = self.__transport.request(
# Here, the XMLRPC library translates a single list
# response to the single value -- should we do the
# same, and require a tuple / list to be passed to
# the response object, or expect the Server to be
# outputting the response appropriately?
if not response:
return None
return_obj = loads(response)
return return_obj
def __getattr__(self, name):
# Same as original, just with new _Method reference
return _Method(self._request, name)
def _notify(self):
# Just like __getattr__, but with notify namespace.
return _Notify(self._request_notify)
class _Method(XML_Method):
def __call__(self, *args, **kwargs):
if len(args) > 0 and len(kwargs) > 0:
raise ProtocolError(
'Cannot use both positional and keyword arguments '
'(according to JSON-RPC spec.)')
if len(args) > 0:
return self.__send(self.__name, args)
return self.__send(self.__name, kwargs)
def __getattr__(self, name):
return _Method(self.__send, "%s.%s" % (self.__name, name))
def __repr__(self):
return '<{} "{}">'.format(self.__class__.__name__, self.__name)
def __str__(self):
return self.__repr__()
def __dir__(self):
return self.__dict__.keys()
class _Notify(object):
def __init__(self, request):
self._request = request
def __getattr__(self, name):
return _Method(self._request, name)
# Batch implementation
class MultiCallMethod(object):
def __init__(self, method, notify=False):
self.method = method
self.params = []
self.notify = notify
def __call__(self, *args, **kwargs):
if len(kwargs) > 0 and len(args) > 0:
raise ProtocolError('JSON-RPC does not support both ' +
'positional and keyword arguments.')
if len(kwargs) > 0:
self.params = kwargs
self.params = args
def request(self, encoding=None, rpcid=None):
return dumps(self.params, self.method, version=2.0,
encoding=encoding, rpcid=rpcid, notify=self.notify)
def __repr__(self):
return '%s' % self.request()
def __getattr__(self, method):
new_method = '%s.%s' % (self.method, method)
self.method = new_method
return self
class MultiCallNotify(object):
def __init__(self, multicall):
self.multicall = multicall
def __getattr__(self, name):
new_job = MultiCallMethod(name, notify=True)
return new_job
class MultiCallIterator(object):
def __init__(self, results):
self.results = results
def __iter__(self):
for i in range(0, len(self.results)):
yield self[i]
raise StopIteration
def __getitem__(self, i):
item = self.results[i]
return item['result']
def __len__(self):
return len(self.results)
class MultiCall(object):
def __init__(self, server):
self._server = server
self._job_list = []
def _request(self):
if len(self._job_list) < 1:
# Should we alert? This /is/ pretty obvious.
request_body = '[ {0} ]'.format(
','.join([job.request() for job in self._job_list]))
responses = self._server._run_request(request_body)
del self._job_list[:]
if not responses:
responses = []
return MultiCallIterator(responses)
def _notify(self):
return MultiCallNotify(self)
def __getattr__(self, name):
new_job = MultiCallMethod(name)
return new_job
__call__ = _request
# These lines conform to xmlrpclib's "compatibility" line.
# Not really sure if we should include these, but oh well.
Server = ServerProxy
class Fault(object):
# JSON-RPC error class
def __init__(self, code=-32000, message='Server error', rpcid=None):
self.faultCode = code
self.faultString = message
self.rpcid = rpcid
def error(self):
return {'code': self.faultCode, 'message': self.faultString}
def response(self, rpcid=None, version=None):
if not version:
version = config.version
if rpcid:
self.rpcid = rpcid
return dumps(
self, methodresponse=True, rpcid=self.rpcid, version=version
def __repr__(self):
return '<Fault %s: %s>' % (self.faultCode, self.faultString)
def random_id(length=8):
return_id = ''
for i in range(length):
return_id += random.choice(IDCHARS)
return return_id
class Payload(dict):
def __init__(self, rpcid=None, version=None):
if not version:
version = config.version = rpcid
self.version = float(version)
def request(self, method, params=[]):
if type(method) not in types.StringTypes:
raise ValueError('Method name must be a string.')
if not = random_id()
request = {'id':, 'method': method}
if params:
request['params'] = params
if self.version >= 2:
request['jsonrpc'] = str(self.version)
return request
def notify(self, method, params=[]):
request = self.request(method, params)
if self.version >= 2:
del request['id']
request['id'] = None
return request
def response(self, result=None):
response = {'result': result, 'id':}
if self.version >= 2:
response['jsonrpc'] = str(self.version)
response['error'] = None
return response
def error(self, code=-32000, message='Server error.'):
error = self.response()
if self.version >= 2:
del error['result']
error['result'] = None
error['error'] = {'code': code, 'message': message}
return error
def dumps(
params=[], methodname=None, methodresponse=None,
encoding=None, rpcid=None, version=None, notify=None):
This differs from the Python implementation in that it implements
the rpcid argument since the 2.0 spec requires it for responses.
if not version:
version = config.version
valid_params = (types.TupleType, types.ListType, types.DictType)
if methodname in types.StringTypes and \
type(params) not in valid_params and \
not isinstance(params, Fault):
If a method, and params are not in a listish or a Fault,
error out.
raise TypeError('Params must be a dict, list, tuple or Fault ' +
# Begin parsing object
payload = Payload(rpcid=rpcid, version=version)
if not encoding:
encoding = 'utf-8'
if type(params) is Fault:
response = payload.error(params.faultCode, params.faultString)
return jdumps(response, encoding=encoding)
if type(methodname) not in types.StringTypes and \
methodresponse is not True:
raise ValueError(
'Method name must be a string, or methodresponse must '
'be set to True.')
if config.use_jsonclass is True:
from jsonrpclib import jsonclass
params = jsonclass.dump(params)
if methodresponse is True:
if rpcid is None:
raise ValueError('A method response must have an rpcid.')
response = payload.response(params)
return jdumps(response, encoding=encoding)
request = None
if notify is True:
request = payload.notify(methodname, params)
request = payload.request(methodname, params)
return jdumps(request, encoding=encoding)
def loads(data):
This differs from the Python implementation, in that it returns
the request structure in Dict format instead of the method, params.
It will return a list in the case of a batch request / response.
if data == '':
# notification
return None
result = jloads(data)
# if the above raises an error, the implementing server code
# should return something like the following:
# { 'jsonrpc':'2.0', 'error': fault.error(), id: None }
if config.use_jsonclass is True:
from jsonrpclib import jsonclass
result = jsonclass.load(result)
return result
def check_for_errors(result):
if not result:
# Notification
return result
if not isinstance(result, dict):
raise TypeError('Response is not a dict.')
if 'jsonrpc' in result.keys() and float(result['jsonrpc']) > 2.0:
raise NotImplementedError('JSON-RPC version not yet supported.')
if 'result' not in result.keys() and 'error' not in result.keys():
raise ValueError('Response does not have a result or error key.')
if 'error' in result.keys() and result['error'] is not None:
code = result['error']['code']
message = result['error']['message']
raise ProtocolError((code, message))
return result
def isbatch(result):
if type(result) not in (types.ListType, types.TupleType):
return False
if len(result) < 1:
return False
if not isinstance(result[0], dict):
return False
if 'jsonrpc' not in result[0].keys():
return False
version = float(result[0]['jsonrpc'])
except ValueError:
raise ProtocolError('"jsonrpc" key must be a float(able) value.')
if version < 2:
return False
return True
def isnotification(request):
if 'id' not in request.keys():
# 2.0 notification
return True
if request['id'] is None:
# 1.0 notification
return True
return False