Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

cleanup python client and add JSON serialization support to it

  • Loading branch information...
commit 2100b2050184ef354ff156fcb3319d4392af192d 1 parent edc7f3a
Jonathan Traupman authored
Showing with 2,352 additions and 533 deletions.
  1. +1 −0  build.properties
  2. +10 −10 build.xml
  3. +63 −0 clients/python/README
  4. +5 −0 clients/python/setup.cfg
  5. +15 −0 clients/python/setup.py
  6. +0 −59 clients/python/test.py
  7. +302 −0 clients/python/tests/test_client.py
  8. +11 −0 clients/python/tests/voldemort_config/config/cluster.xml
  9. +27 −0 clients/python/tests/voldemort_config/config/server.properties
  10. +32 −0 clients/python/tests/voldemort_config/config/stores.xml
  11. +0 −399 clients/python/voldemort.py
  12. +1 −0  clients/python/voldemort/__init__.py
  13. +478 −0 clients/python/voldemort/client.py
  14. 0  clients/python/voldemort/protocol/__init__.py
  15. 0  clients/python/{ → voldemort/protocol}/slop_pb2.py
  16. +65 −65 clients/python/{ → voldemort/protocol}/voldemort_admin_pb2.py
  17. 0  clients/python/{ → voldemort/protocol}/voldemort_client_pb2.py
  18. +8 −0 clients/python/voldemort/serialization/__init__.py
  19. +3 −0  clients/python/voldemort/serialization/common.py
  20. +1,252 −0 clients/python/voldemort/serialization/json_serializer.py
  21. +69 −0 clients/python/voldemort/serialization/ordered_dict.py
  22. +10 −0 clients/python/voldemort/serialization/string_serializer.py
View
1  build.properties
@@ -2,6 +2,7 @@
src.dir=src
java.dir=src/java
python.dir=clients/python
+python.proto.dir=clients/python/voldemort/protocol
protobuff.dir=src/proto
lib.dir=lib
classes.dir=dist/classes
View
20 build.xml
@@ -22,7 +22,7 @@
<isset property="env.BUILD_NUMBER" />
<not>
<equals arg1="" arg2="${env.BUILD_NUMBER}" trim="yes"/>
- </not>
+ </not>
</and>
</condition>
@@ -112,7 +112,7 @@
<fileset dir="${testclasses.dir}" />
</jar>
</target>
-
+
<target name="protobuff" description="Generate source files from .proto files">
<pathconvert property="proto.sources" pathsep=" ">
<path id="proto-files">
@@ -120,9 +120,9 @@
</path>
</pathconvert>
- <property name="proto.path" location="${protobuff.dir}"/>
+ <property name="proto.path" location="${protobuff.dir}"/>
<property name="javaout.path" location="${java.dir}"/>
- <property name="pythonout.path" location="${python.dir}"/>
+ <property name="pythonout.path" location="${python.proto.dir}"/>
<exec executable="protoc" failonerror="true">
<arg value="--proto_path=${proto.path}"/>
<arg value="--java_out=${javaout.path}"/>
@@ -151,7 +151,7 @@
</fileset>
</jar>
</target>
-
+
<target name="alljar" depends="build, contrib-build" description="Build a jar file that includes all contrib code.">
<jar destfile="${dist.dir}/${name}-${curr.release}-all.jar">
<fileset dir="${classes.dir}">
@@ -166,7 +166,7 @@
</fileset>
</jar>
</target>
-
+
<target name="war" depends="build" description="Build server war file">
<war destfile="${dist.dir}/${name}.war" webxml="web.xml" basedir="${classes.dir}">
<classes dir="${classes.dir}"/>
@@ -312,7 +312,7 @@
<report todir="${contribtesthtml.dir}" format="frames" />
</junitreport>
</target>
-
+
<macrodef name="create-release-artifacts">
<attribute name="version" />
<sequential>
@@ -340,12 +340,12 @@
<target name="snapshot" description="Create a release-snapshot zip file with everything pre-built.">
<create-release-artifacts version="${curr.release.snapshot}" />
</target>
-
+
<target name="release" description="Create a release zip file with everything pre-built.">
<create-release-artifacts version="${curr.release}" />
</target>
-
- <target name="hadoop-benchmark-jar" depends="build, contrib-build"
+
+ <target name="hadoop-benchmark-jar" depends="build, contrib-build"
description="Build a jar file that includes all contrib code plus the necessary jars for running the hadoop benchmark.">
<jar destfile="${dist.dir}/hadoop-benchmark.jar">
<fileset dir="${classes.dir}">
View
63 clients/python/README
@@ -0,0 +1,63 @@
+ABOUT
+
+This directory contains a pure Python implementation of a voldemort client.
+It supports both raw (string) and JSON serialized stores. Only the protocol
+buffer interface over TCP is supported for talking to the server. Only
+server-side routing is supported.
+
+INSTALLING
+
+To install the module, you will need the following dependencies:
+- nose >= 0.11
+- simplejson >= 2.1.1
+- Google protobuf > 2.3.0
+
+The setup process will automatically install nose and simplejson, since they
+are well behaved Python packages. The protobuf module will need to be downloaded
+from https://code.google.com/p/protobuf/downloads/list and installed manually.
+
+Once the dependencies, run the test suite to sanity check things. You need to
+first start up a Voldemort server locally, pointing to the config files in
+tests/voldemort_config. From the root voldemort of the voldemort source tree, run:
+
+> bin/voldemort-server clients/python/tests/voldemort_config
+
+In a separate shell, change into the clients/python directory and run:
+
+> python setup.py nosetests
+
+If all tests pass, you can install the package with the command:
+
+> python setup.py install
+
+This may need to be run as root if you don't have permissions to install to your
+local python library.
+
+USING THE MODULE
+
+To use the client, simple import it into your program with the statement:
+
+import voldemort
+
+To create a client connection, instantiate a StoreClient object:
+
+client = voldemort.StoreClient('store_name', [('node1', 6666), ('node2', 6666)])
+
+The values of the store name and cluster nodes/ports will depend on your particular
+Voldemort setup. The key and value serialization type will be determined
+automatically during client initialization using the values in your cluster's
+stores.xml file.
+
+The StoreClient object implements the get(), get_all(), put(), maybe_put(),
+and delete() methods. For example:
+
+> v1 = client.put("foo", "hello")
+> resp = client.get("foo")
+> resp[0][0]
+"hello"
+
+> client.delete("foo")
+> client.get("foo")
+[]
+
+The test suite contains many other usage examples.
View
5 clients/python/setup.cfg
@@ -0,0 +1,5 @@
+[nosetests]
+detailed-errors=1
+with-doctest=1
+
+
View
15 clients/python/setup.py
@@ -0,0 +1,15 @@
+from setuptools import setup
+
+setup(name='voldemort',
+ version='0.1',
+ description='Python Voldemort Client',
+ long_description=('Pure python client for accessing Voldemort key/value stores. ' +
+ 'Supports both raw and JSON stores. Only supports the tcp protocol ' +
+ 'buffer interface with server-side routing.'),
+ packages=['voldemort', 'voldemort.protocol', 'voldemort.serialization'],
+ author='LinkedIn Corporation',
+ license='Apache 2.0',
+ url='http://project-voldemort.com',
+ install_requires=['protobuf>=2.3.0', 'simplejson>=2.1.1'],
+ setup_requires=['nose>=0.11'],
+)
View
59 clients/python/test.py
@@ -1,59 +0,0 @@
-import logging
-import time
-from voldemort import StoreClient
-
-if __name__ == '__main__':
-
- logging.basicConfig(level=logging.INFO,)
-
- ## some random tests
- s = StoreClient('test', [('localhost', 6666)])
- version = s.put("hello", "1")
- assert s.get("hello")[0][0] == "1"
- s.put("hello", "2", version)
- assert s.get("hello")[0][0] == "2"
- s.put("hello", "3")
- assert s.get("hello")[0][0] == "3"
- s.delete("hello")
- assert len(s.get("hello")) == 0
-
- ## test get_all
- pairs = [("a1", "1"), ("a2", "2"), ("a3", "3"), ("a4", "4")]
- for k, v in pairs:
- s.put(k, v)
-
- vals = s.get_all([k for k, v in pairs])
- for k, v in pairs:
- assert vals[k][0][0] == v
-
- requests = 10000
-
- ## Time get requests
- s.put("hello", "world")
- start = time.time()
- for i in xrange(requests):
- s.get('hello')
- print requests/(time.time() - start), ' get requests per second'
-
- ## Time put requests
- version = s.put('abc', 'def')
- start = time.time()
- for i in xrange(requests):
- version = s.put('abc', 'def', version)
- print requests/(time.time() - start), ' put requests per second'
-
- ## Time get_all requests
- keys = [k for k,v in pairs]
- start = time.time()
- for i in xrange(requests):
- vals = s.get_all(keys)
- print requests/(time.time() - start), ' get_all requests per second'
-
- ## Time delete requests
- version = None
- for i in xrange(requests):
- version = s.put(str(i), str(i))
- start = time.time()
- for i in xrange(requests):
- vals = s.delete(str(i), version)
- print requests/(time.time() - start), ' delete requests per second'
View
302 clients/python/tests/test_client.py
@@ -0,0 +1,302 @@
+# Tests of the client code.
+#
+# To run these tests, you must have a local Voldemort server running using the configuration files
+# in tests/voldemort_config.
+
+import unittest
+import datetime
+
+from voldemort import StoreClient, VoldemortException
+
+def _vector_clock_equal(clock1, clock2):
+ """
+ Compares two vector clocks, ignoring the timestamp field, which may be skewed.
+ """
+
+ clock1_entries = dict((entry.node_id, entry.version) for entry in clock1.entries)
+ clock2_entries = dict((entry.node_id, entry.version) for entry in clock2.entries)
+ return clock1_entries == clock2_entries
+
+class VoldemortClientTest(unittest.TestCase):
+ def _reinit_raw_client(self):
+ s = StoreClient('test', [('localhost', 6666)])
+ for k in ['a', 'b', 'c']:
+ s.delete(k)
+ return s
+
+ def _reinit_json_client(self):
+ s = StoreClient('json_test', [('localhost', 6666)])
+ for k in [1, 2, 3]:
+ s.delete(k)
+ return s
+
+ def test_raw_get(self):
+ """
+ Tests basic puts/gets in raw (non-serialized) mode.
+ """
+
+ s = self._reinit_raw_client()
+
+ s.put('a', '1')
+ resp = s.get('a')
+ self.assertEquals(len(resp), 1)
+ self.assertEquals(len(resp[0]), 2)
+ self.assertEquals(resp[0][0], '1')
+
+ s.put('b', '2')
+ resp = s.get('b')
+ self.assertEquals(len(resp), 1)
+ self.assertEquals(len(resp[0]), 2)
+ self.assertEquals(resp[0][0], '2')
+
+
+ def test_raw_get_all(self):
+ """
+ Tests the get_all() method in raw mode.
+ """
+
+ s = self._reinit_raw_client()
+
+ pairs = [('a', '1'), ('b', '2'), ('c', '3')]
+ for k, v in pairs:
+ s.put(k, v)
+
+ resp = s.get_all([k for k, v in pairs])
+ self.assertEquals(len(resp), len(pairs))
+
+ for k, v in pairs:
+ self.assertTrue(k in resp)
+ self.assertEquals(len(resp[k]), 1)
+ self.assertEquals(len(resp[k][0]), 2)
+
+ self.assertEquals(resp[k][0][0], v)
+
+
+ def test_raw_versions(self):
+ """
+ Tests the put_maybe() method in raw mode.
+ """
+
+ s = self._reinit_raw_client()
+
+ v1 = s.put('a', '1')
+ self.assertTrue(v1 is not None)
+
+ v2 = s.put('a', '2')
+ self.assertTrue(v2 is not None)
+ self.assertFalse(_vector_clock_equal(v2, v1))
+
+ v3 = s.put('a', '3')
+ self.assertTrue(v3 is not None)
+ self.assertFalse(_vector_clock_equal(v3, v1))
+ self.assertFalse(_vector_clock_equal(v3, v2))
+
+ resp = s.get('a')
+ self.assertEquals(resp[0][0], '3')
+
+ # put() should fail because v2 is not the current version
+ self.assertRaises(VoldemortException, s.put, 'a', '4', version=v2)
+
+ # maybe_put() won't raise an exception, but will return None
+ v4 = s.maybe_put('a', '4', version=v2)
+ self.assertTrue(v4 is None)
+
+ # this put() should succeed
+ v4 = s.put('a', '4', version=v3)
+ self.assertTrue(v4 is not None)
+ self.assertFalse(_vector_clock_equal(v4, v1))
+ self.assertFalse(_vector_clock_equal(v4, v2))
+ self.assertFalse(_vector_clock_equal(v4, v2))
+
+ # and this maybe_put() should not return None
+ v5 = s.maybe_put('a', '5', version=v4)
+ self.assertTrue(v5 is not None)
+ self.assertFalse(_vector_clock_equal(v5, v1))
+ self.assertFalse(_vector_clock_equal(v5, v2))
+ self.assertFalse(_vector_clock_equal(v5, v3))
+ self.assertFalse(_vector_clock_equal(v5, v4))
+
+ # the value at the latest version should be "5"
+ resp = s.get('a')
+ self.assertEquals(resp[0][0], '5')
+ self.assertTrue(_vector_clock_equal(resp[0][1], v5))
+
+ # deleting old versions should have no effect
+ s.delete('a', version=v3)
+ resp = s.get('a')
+ self.assertEquals(len(resp), 1)
+ self.assertEquals(resp[0][0], '5')
+ self.assertTrue(_vector_clock_equal(resp[0][1], v5))
+
+ # deleting the current version should erase the entry
+ s.delete('a', version=v5)
+ resp = s.get('a')
+ self.assertEquals(resp, [])
+
+ val1 = {
+ 'a': 0.25,
+ 'b': [1,2,3],
+ 'c': u'foo',
+ 'd': { 'foo': True,
+ 'bar': datetime.datetime(2010, 11, 24, 20, 8, 7, 155000)
+ }
+ }
+
+ val2 = {
+ 'a': 4.0,
+ 'b': [5,6],
+ 'c': u'bar',
+ 'd': { 'foo': None,
+ 'bar': datetime.datetime(2003, 5, 5, 1, 23, 45, 678000)
+ }
+ }
+
+ val3 = {
+ 'a': 8.0,
+ 'b': [],
+ 'c': u'',
+ 'd': None
+ }
+
+ val4 = {
+ 'a': 4.0,
+ 'b': [5,6],
+ 'c': 'bar',
+ 'd': { 'foo': True,
+ 'bar': datetime.datetime(2003, 5, 5, 1, 23, 45, 678123)
+ }
+ }
+
+ def test_json_get(self):
+ """
+ Tests the JSON serialization with put()/get()
+ """
+
+ s = self._reinit_json_client()
+
+ s.put(1, self.val1)
+ resp = s.get(1)
+ self.assertEquals(len(resp), 1)
+ self.assertEquals(len(resp[0]), 2)
+ self.assertEquals(resp[0][0], self.val1)
+
+ s.put(2, self.val2)
+ resp = s.get(2)
+ self.assertEquals(len(resp), 1)
+ self.assertEquals(len(resp[0]), 2)
+ self.assertEquals(resp[0][0], self.val2)
+
+ def test_json_get_all(self):
+ """
+ Tests JSON serialized get_all()
+ """
+ s = self._reinit_json_client()
+
+ pairs = [(1, self.val1), (2, self.val2), (3, self.val3)]
+ for k, v in pairs:
+ s.put(k, v)
+
+ resp = s.get_all([k for k, v in pairs])
+ self.assertEquals(len(resp), len(pairs))
+
+ for k, v in pairs:
+ self.assertTrue(k in resp)
+ self.assertEquals(len(resp[k]), 1)
+ self.assertEquals(len(resp[k][0]), 2)
+
+ self.assertEquals(resp[k][0][0], v)
+
+ def test_json_mismatches(self):
+ """
+ Sometimes the result we get out of Voldemort is a little different than what
+ went in, but it's not always a problem.
+ """
+
+ s = self._reinit_json_client()
+
+ s.put(1, self.val4)
+ resp = s.get(1)
+ self.assertEquals(len(resp), 1)
+ self.assertEquals(len(resp[0]), 2)
+
+ output = resp[0][0]
+ # the input and output won't be the same
+ self.assertNotEquals(output, self.val4)
+
+ # the float should have survived the trip
+ self.assertEquals(output['a'], self.val4['a'])
+
+ # as should the list
+ self.assertEquals(output['b'], self.val4['b'])
+
+ # the string is now a unicode, but it should still compare equal to the original
+ self.assertTrue(isinstance(output['c'], unicode))
+ self.assertEquals(output['c'], self.val4['c'])
+
+ # the boolean should be the same
+ self.assertEquals(output['d']['foo'], self.val4['d']['foo'])
+
+ # but the date gets truncated:
+ self.assertNotEquals(output['d']['bar'], self.val4['d']['bar'])
+
+ # the difference should be small
+ td = self.val4['d']['bar'] - output['d']['bar']
+ self.assertEquals(td.days, 0)
+ self.assertEquals(td.seconds, 0)
+ self.assertEquals(td.microseconds, 123)
+
+ def test_raw_versions(self):
+ """
+ Tests versioning in JSON mode.
+ """
+
+ s = self._reinit_json_client()
+
+ v1 = s.put(1, self.val4)
+ self.assertTrue(v1 is not None)
+
+ v2 = s.put(1, self.val3)
+ self.assertTrue(v2 is not None)
+ self.assertFalse(_vector_clock_equal(v2, v1))
+
+ resp = s.get(1)
+ self.assertEquals(resp[0][0], self.val3)
+ self.assertTrue(_vector_clock_equal(resp[0][1], v2))
+
+ # put() should fail because v1 is not the current version
+ self.assertRaises(VoldemortException, s.put, 1, self.val2, version=v1)
+
+ # maybe_put() won't raise an exception, but will return None
+ v3 = s.maybe_put(1, self.val2, version=v1)
+ self.assertTrue(v3 is None)
+
+ # this put() should succeed
+ v3 = s.put(1, self.val2, version=v2)
+ self.assertTrue(v3 is not None)
+ self.assertFalse(_vector_clock_equal(v3, v1))
+ self.assertFalse(_vector_clock_equal(v3, v2))
+
+ # and this maybe_put() should not return None
+ v4 = s.maybe_put(1, self.val1, version=v3)
+ self.assertTrue(v4 is not None)
+ self.assertFalse(_vector_clock_equal(v4, v1))
+ self.assertFalse(_vector_clock_equal(v4, v2))
+ self.assertFalse(_vector_clock_equal(v4, v3))
+
+ # the value at the latest version should be val1 at version v4
+ resp = s.get(1)
+ self.assertEquals(resp[0][0], self.val1)
+ self.assertTrue(_vector_clock_equal(resp[0][1], v4))
+
+ # deleting old versions should have no effect
+ s.delete(1, version=v2)
+ resp = s.get(1)
+ self.assertEquals(len(resp), 1)
+ self.assertEquals(resp[0][0], self.val1)
+ self.assertTrue(_vector_clock_equal(resp[0][1], v4))
+
+ # deleting the current version should erase the entry
+ s.delete(1, version=v4)
+ resp = s.get(1)
+ self.assertEquals(resp, [])
+
View
11 clients/python/tests/voldemort_config/config/cluster.xml
@@ -0,0 +1,11 @@
+<cluster>
+ <name>mycluster</name>
+ <server>
+ <id>0</id>
+ <host>localhost</host>
+ <http-port>8081</http-port>
+ <socket-port>6666</socket-port>
+ <partitions>0, 1</partitions>
+ </server>
+</cluster>
+
View
27 clients/python/tests/voldemort_config/config/server.properties
@@ -0,0 +1,27 @@
+# The ID of *this* particular cluster node
+node.id=0
+
+max.threads=100
+
+############### DB options ######################
+
+http.enable=true
+socket.enable=true
+
+# BDB
+bdb.write.transactions=false
+bdb.flush.transactions=false
+bdb.cache.size=1G
+
+# Mysql
+mysql.host=localhost
+mysql.port=1521
+mysql.user=root
+mysql.password=3306
+mysql.database=test
+
+#NIO connector settings.
+enable.nio.connector=false
+
+request.format=vp3
+storage.configs=voldemort.store.bdb.BdbStorageConfiguration, voldemort.store.readonly.ReadOnlyStorageConfiguration
View
32 clients/python/tests/voldemort_config/config/stores.xml
@@ -0,0 +1,32 @@
+<stores>
+ <store>
+ <name>test</name>
+ <persistence>bdb</persistence>
+ <routing>client</routing>
+ <replication-factor>1</replication-factor>
+ <required-reads>1</required-reads>
+ <required-writes>1</required-writes>
+ <key-serializer>
+ <type>string</type>
+ </key-serializer>
+ <value-serializer>
+ <type>string</type>
+ </value-serializer>
+ </store>
+ <store>
+ <name>json_test</name>
+ <persistence>bdb</persistence>
+ <routing>client</routing>
+ <replication-factor>1</replication-factor>
+ <required-reads>1</required-reads>
+ <required-writes>1</required-writes>
+ <key-serializer>
+ <type>json</type>
+ <schema-info version="0">"int32"</schema-info>
+ </key-serializer>
+ <value-serializer>
+ <type>json</type>
+ <schema-info version="0">{ "a":"float32", "b":["int16"], "c":"string", "d":{ "foo":"boolean", "bar":"date" }}</schema-info>
+ </value-serializer>
+ </store>
+</stores>
View
399 clients/python/voldemort.py
@@ -1,399 +0,0 @@
-import socket
-import struct
-import time
-import sys
-import re
-import random
-import logging
-import sets
-
-import voldemort_client_pb2 as protocol
-from xml.dom import minidom
-from datetime import datetime
-
-##################################################################
-# A Voldemort client. Each client uses a single connection to one
-# Voldemort server. All routing is done server-side.
-##################################################################
-
-
-## Extract all the child text of the given element
-def _extract_text(elm):
- if elm.nodeType == minidom.Node.TEXT_NODE:
- return elm.data
- elif elm.nodeType == minidom.Node.ELEMENT_NODE:
- text = ""
- for child in elm.childNodes:
- text += _extract_text(child)
- return text
-
-
-## Get a single child from the element, if there are multiple children, explode.
-def _child(elmt, name):
- children = elmt.getElementsByTagName(name)
- if len(children) != 1:
- raise Exception, "No child '" + str(name) + "' for element " + str(elmt.nodeName)
- return children[0]
-
-
-## Get the child text of a single element
-def _child_text(elmt, name):
- return _extract_text(_child(elmt, name))
-
-
-
-##################################################################
-# A node class representing a single voldemort server in the
-# cluster. The cluster itself is just a list of nodes
-##################################################################
-class Node:
- """A Voldemort node with the appropriate host and port information for contacting that node"""
-
- def __init__(self, id, host, socket_port, http_port, partitions, is_available = True, last_contact = None):
- self.id = id
- self.host = host
- self.socket_port = socket_port
- self.http_port = http_port
- self.partitions = partitions
- self.is_available = True
- if not last_contact:
- self.last_contact = time.clock()
-
- def __repr__(self):
- return 'node(id = ' + str(self.id) + ', host = ' + self.host + ', socket_port = ' + str(self.socket_port) + \
- ', http_port = ' + str(self.http_port) + ', partitions = ' + ', '.join(map(str, self.partitions)) + ')'
-
- @staticmethod
- def parse_cluster(xml):
- """Parse the cluster.xml file and return a dictionary of the nodes in the cluster indexed by node id """
- doc = minidom.parseString(xml)
- nodes = {}
- for curr in doc.getElementsByTagName('server'):
- id = int(_child_text(curr, 'id'))
- host = _child_text(curr, 'host')
- http_port = int(_child_text(curr, 'http-port'))
- socket_port = int(_child_text(curr, 'socket-port'))
- partition_str = _child_text(curr, 'partitions')
- partitions = [int(p) for p in re.split('[\s,]+', partition_str)]
- nodes[id] = Node(id = id, host = host, socket_port = socket_port, http_port = http_port, partitions = partitions)
- return nodes
-
-
-
-class VoldemortException(Exception):
- def __init__(self, message, code = 1):
- self.code = code
- self.message = message
-
- def __str__(self):
- return repr(self.message)
-
-
-
-class StoreClient:
- """A simple Voldemort client. It is single-threaded and supports only server-side routing."""
-
- def __init__(self, store_name, bootstrap_urls, reconnect_interval = 500, conflict_resolver = None):
- self.store_name = store_name
- self.request_count = 0
- self.conflict_resolver = conflict_resolver
- self.nodes = self._bootstrap_metadata(bootstrap_urls)
- self.node_id = random.randint(0, len(self.nodes) - 1)
- self.node_id, self.connection = self._reconnect()
- self.reconnect_interval = reconnect_interval
- self.open = True
-
- def _make_connection(self, host, port):
- protocol = 'pb0'
- logging.debug('Attempting to connect to ' + host + ':' + str(port))
- connection = socket.socket()
- connection.connect((host, port))
- logging.debug('Connection succeeded, negotiating protocol')
- connection.send(protocol)
- resp = connection.recv(2)
- if resp != 'ok':
- raise VoldemortException('Server does not understand the protocol ' + protocol)
- logging.debug('Protocol negotiation suceeded')
- return connection
-
-
- ## Connect to a the next available node in the cluster
- ## returns a tuple of (node_id, connection)
- def _reconnect(self):
- num_nodes = len(self.nodes)
- attempts = 0
- new_node_id = self.node_id
- while attempts < num_nodes:
- new_node_id = (new_node_id + 1) % num_nodes
- new_node = self.nodes[new_node_id]
- connection = None
- try:
- connection = self._make_connection(new_node.host, new_node.socket_port)
- self.request_count = 0
- return new_node_id, connection
- except socket.error, (err_num, message):
- self._close_socket(connection)
- logging.warn('Error connecting to node ' + str(new_node_id) + ': ' + message)
- attempts += 1
-
- # If we get here all nodes have failed us, explode
- raise VoldemortException('Connections to all nodes failed.')
-
-
- ## Safely close the socket, catching and logging any exceptions
- def _close_socket(self, socket):
- try:
- if socket:
- socket.close()
- except socket.error, exp:
- logging.error('Error while closing socket: ' + str(exp))
-
-
- ## Check if the the number of requests made on this connection is greater than the reconnect interval.
- ## If so reconnect to a random node in the cluster. No attempt is made at preventing the reconnecting
- ## from going back to the same node
- def _maybe_reconnect(self):
- if self.request_count >= self.reconnect_interval:
- logging.debug('Completed ' + str(self.request_count) + ' requests using this connection, reconnecting...')
- self._close_socket(self.connection)
- self.node_id, self.connection = self._reconnect()
-
-
- ## send a request to the server using the given connection
- def _send_request(self, connection, req_bytes):
- connection.send(struct.pack('>i', len(req_bytes)) + req_bytes)
- self.request_count += 1
-
-
- ## read a response from the connection
- def _receive_response(self, connection):
- size_bytes = connection.recv(4)
- size = struct.unpack('>i', size_bytes)
-
- if not size[0]:
- return ''
-
- return connection.recv(size[0])
-
- ## Bootstrap cluster metadata from a list of urls of nodes in the cluster.
- ## The urls are tuples in the form (host, port).
- ## A dictionary of node_id => node is returned.
- def _bootstrap_metadata(self, bootstrap_urls):
- random.shuffle(bootstrap_urls)
- for host, port in bootstrap_urls:
- logging.debug('Attempting to bootstrap metadata from ' + host + ':' + str(port))
- connection = None
- try:
- connection = self._make_connection(host, port)
- cluster_xmls = self._get_with_connection(connection, 'metadata', 'cluster.xml', should_route = False)
- if len(cluster_xmls) != 1:
- raise VoldemortException('Expected exactly one version of the metadata but found ' + str(cluster_xmls))
- nodes = Node.parse_cluster(cluster_xmls[0][0])
- logging.debug('Bootstrap from ' + host + ':' + str(port) + ' succeeded, found ' + str(len(nodes)) + " nodes.")
- return nodes
- except socket.error, (err_num, message):
- logging.warn('Metadata bootstrap from ' + host + ':' + str(port) + " failed: " + message)
- finally:
- self._close_socket(connection)
- raise VoldemortException('All bootstrap attempts failed')
-
-
- ## check if the server response has an error, if so throw an exception
- def _check_error(self, resp):
- if resp._has_error:
- raise VoldemortException(resp.error.error_message, resp.error.error_code)
-
-
- ## Increment the version for a vector clock
- def _increment(self, clock):
- # See if we already have a version for this guy, if so increment it
- for entry in clock.entries:
- if entry.node_id == self.node_id:
- entry.version += 1
- return
- # Otherwise add a version
- entry = clock.entries.add()
- entry.node_id = self.node_id
- entry.version = 1
- clock.timestamp = int(time.time() * 1000)
-
-
- ## Take a list of versions, and, if a conflict resolver has been given, resolve any conflicts that can be resolved
- def _resolve_conflicts(self, versions):
- if self.conflict_resolver and versions:
- return self.conflict_resolver(versions)
- else:
- return versions
-
-
- ## Turn a protocol buffer list of versioned items into a python list of items
- def _extract_versions(self, pb_versioneds):
- versions = []
- for versioned in pb_versioneds:
- versions.append((versioned.value, versioned.version))
- return self._resolve_conflicts(versions)
-
- ## A basic request wrapper, that handles reconnection logic and failures
- def _execute_request(self, fun, args):
- assert self.open, 'Store has been closed.'
- self._maybe_reconnect()
-
- failures = 0
- num_nodes = len(self.nodes)
- while failures < num_nodes:
- try:
- return apply(fun, args)
- except socket.error, (err_num, message):
- logging.warn('Error while performing ' + fun.__name__ + ' on node ' + str(self.node_id) + ': ' + message)
- self._reconnect()
- failures += 1
- raise VoldemortException('All nodes are down, ' + fun.__name__ + ' failed.')
-
-
- ## An internal get function that take the connection and store name as parameters. This is
- ## used by both the public get() method and also the metadata bootstrap process
- def _get_with_connection(self, connection, store_name, key, should_route):
- """Execute get request to the given store. Returns a (value, version) pair."""
-
- req = protocol.VoldemortRequest()
- req.should_route = should_route
- req.store = store_name
- req.type = protocol.GET
- req.get.key = key
-
- # send request
- self._send_request(connection, req.SerializeToString())
-
- # read and parse response
- resp_str = self._receive_response(connection)
- resp = protocol.GetResponse()
- resp.ParseFromString(resp_str)
- self._check_error(resp)
-
- return self._extract_versions(resp.versioned)
-
-
- ## Inner helper function for get
- def _get(self, key):
- return self._get_with_connection(self.connection, self.store_name, key, True)
-
-
- def get(self, key):
- """Execute a get request. Returns a list of (value, version) pairs."""
- return self._execute_request(self._get, [key])
-
-
- ## Inner get_all method that takes the connection and store_name as parameters
- def _get_all(self, keys):
- req = protocol.VoldemortRequest()
- req.should_route = True
- req.store = self.store_name
- req.type = protocol.GET_ALL
- for key in keys:
- req.getAll.keys.append(key)
-
- # send request
- self._send_request(self.connection, req.SerializeToString())
-
- # read and parse response
- resp_str = self._receive_response(self.connection)
- resp = protocol.GetAllResponse()
- resp.ParseFromString(resp_str)
- self._check_error(resp)
- values = {}
- for key_val in resp.values:
- values[key_val.key] = self._extract_versions(key_val.versions)
- return values
-
-
- def get_all(self, keys):
- """Execute get request for multiple keys given as a list or tuple.
- Returns a dictionary of key => [(value, version), ...] pairs."""
- return self._execute_request(self._get_all, [keys])
-
-
- ## Get the current version of the given key by doing a get request to the store
- def _fetch_version(self, key):
- versioned = self.get(key)
- if versioned:
- version = versioned[0][1]
- else:
- version = protocol.VectorClock()
- version.timestamp = int(time.time() * 1000)
- return version
-
-
- def _put(self, key, value, version):
- req = protocol.VoldemortRequest()
- req.should_route = True
- req.store = self.store_name
- req.type = protocol.PUT
- req.put.key = key
- req.put.versioned.value = value
- req.put.versioned.version.MergeFrom(version)
-
- # send request
- self._send_request(self.connection, req.SerializeToString())
-
- # read and parse response
- resp_str = self._receive_response(self.connection)
- resp = protocol.PutResponse()
- resp.ParseFromString(resp_str)
- self._check_error(resp)
- self._increment(version)
- return version
-
- def put (self, key, value, version = None):
- """Execute a put request using the given key and value. If no version is specified a get(key) request
- will be done to get the current version. The updated version is returned."""
- # if we don't have a version, fetch one
- if not version:
- version = self._fetch_version(key)
- return self._execute_request(self._put, [key, value, version])
-
-
- def maybe_put(self, key, value, version = None):
- """Execute a put request using the given key and value. If the version being put is obsolete,
- no modification will be made and this function will return None. Otherwise it will return the new version."""
- try:
- return self.put(key, value, version)
- except:
- return None
-
- def _delete(self, key, version):
- req = protocol.VoldemortRequest()
- req.should_route = True
- req.store = self.store_name
- req.type = protocol.DELETE
- req.delete.key = key
- req.delete.version.MergeFrom(version)
-
- # send request
- self._send_request(self.connection, req.SerializeToString())
-
- # read and parse response
- resp_str = self._receive_response(self.connection)
- resp = protocol.DeleteResponse()
- resp.ParseFromString(resp_str)
- self._check_error(resp)
-
- return resp.success
-
-
- def delete(self, key, version = None):
- """Execute a delete request, deleting all keys up to and including the given version.
- If no version is given a get(key) request will be done to find the latest version."""
- # if we don't have a version, fetch one
- if version == None:
- version = self._fetch_version(key)
- return self._execute_request(self._delete, [key, version])
-
-
- def close(self):
- """Close the connection this store maintains."""
- self.open = False
- self.connection.close()
-
-
-
-
View
1  clients/python/voldemort/__init__.py
@@ -0,0 +1 @@
+from client import StoreClient, VoldemortException
View
478 clients/python/voldemort/client.py
@@ -0,0 +1,478 @@
+import socket
+import struct
+import time
+import sys
+import re
+import random
+import logging
+
+import protocol.voldemort_client_pb2 as protocol
+from xml.dom import minidom
+from datetime import datetime
+
+import serialization
+
+##################################################################
+# A Voldemort client. Each client uses a single connection to one
+# Voldemort server. All routing is done server-side.
+##################################################################
+
+
+## Extract all the child text of the given element
+def _extract_text(elm):
+ if elm.nodeType == minidom.Node.TEXT_NODE:
+ return elm.data
+ elif elm.nodeType == minidom.Node.ELEMENT_NODE:
+ text = ""
+ for child in elm.childNodes:
+ text += _extract_text(child)
+ return text
+
+
+## Get a single child from the element, if there are multiple children, explode.
+def _child(elmt, name, required=True):
+ children = elmt.getElementsByTagName(name)
+ if not children:
+ if required:
+ raise VoldemortException("No child '%s' for element '%s'." % (name, elmt.nodeName))
+ else:
+ return None
+
+ if len(children) > 1:
+ raise VoldemortException("Multiple children '%s' for element '%s'." % (name, elmt.nodeName))
+ return children[0]
+
+
+## Get the child text of a single element
+def _child_text(elmt, name, required=True, default=None):
+ if default:
+ required = False
+
+ child = _child(elmt, name, required=required)
+ if not child:
+ return default
+
+ return _extract_text(child)
+
+
+def _int_or_none(s):
+ if s is None:
+ return s
+ return int(s)
+
+
+##################################################################
+# A node class representing a single voldemort server in the
+# cluster. The cluster itself is just a list of nodes
+##################################################################
+class Node:
+ """A Voldemort node with the appropriate host and port information for contacting that node"""
+
+ def __init__(self, id, host, socket_port, http_port, partitions, is_available = True, last_contact = None):
+ self.id = id
+ self.host = host
+ self.socket_port = socket_port
+ self.http_port = http_port
+ self.partitions = partitions
+ self.is_available = True
+ if not last_contact:
+ self.last_contact = time.clock()
+
+ def __repr__(self):
+ return 'node(id = ' + str(self.id) + ', host = ' + self.host + ', socket_port = ' + str(self.socket_port) + \
+ ', http_port = ' + str(self.http_port) + ', partitions = ' + ', '.join(map(str, self.partitions)) + ')'
+
+ @staticmethod
+ def parse_cluster(xml):
+ """Parse the cluster.xml file and return a dictionary of the nodes in the cluster indexed by node id """
+ doc = minidom.parseString(xml)
+ nodes = {}
+ for curr in doc.getElementsByTagName('server'):
+ id = int(_child_text(curr, 'id'))
+ host = _child_text(curr, 'host')
+ http_port = int(_child_text(curr, 'http-port'))
+ socket_port = int(_child_text(curr, 'socket-port'))
+ partition_str = _child_text(curr, 'partitions')
+ partitions = [int(p) for p in re.split('[\s,]+', partition_str)]
+ nodes[id] = Node(id = id, host = host, socket_port = socket_port, http_port = http_port, partitions = partitions)
+ return nodes
+
+
+class Store:
+ def __init__(self, store_node):
+ self.name = _child_text(store_node, "name")
+ self.persistence = _child_text(store_node, "persistence")
+ self.routing = _child_text(store_node, "routing")
+ self.routing_strategy = _child_text(store_node, "routing-strategy", default="consistent-routing")
+ self.replication_factor = int(_child_text(store_node, "replication-factor"))
+ self.required_reads = int(_child_text(store_node, "required-reads"))
+ self.preferred_reads = _int_or_none(_child_text(store_node, "preferred-reads", required=False))
+ self.required_writes = int(_child_text(store_node, "required-writes"))
+ self.preferred_writes = _int_or_none(_child_text(store_node, "preferred-writes", required=False))
+
+ key_serializer_node = _child(store_node, "key-serializer")
+ self.key_serializer_type = _child_text(key_serializer_node, "type")
+ self.key_serializer = self._create_serializer(self.key_serializer_type, key_serializer_node)
+
+ value_serializer_node = _child(store_node, "value-serializer")
+ self.value_serializer_type = _child_text(value_serializer_node, "type")
+ self.value_serializer = self._create_serializer(self.value_serializer_type, value_serializer_node)
+
+ def _create_serializer(self, serializer_type, serializer_node):
+ if serializer_type not in serialization.SERIALIZER_CLASSES:
+ raise VoldemortException("Unknown serializer type: %s" % serializer_type)
+
+ return serialization.SERIALIZER_CLASSES[serializer_type].create_from_xml(serializer_node)
+
+ @staticmethod
+ def parse_stores_xml(xml):
+ doc = minidom.parseString(xml)
+ stores = [Store(store_node) for store_node in doc.getElementsByTagName("store")]
+ return dict((store.name, store) for store in stores)
+
+class VoldemortException(Exception):
+ def __init__(self, msg, code = 1):
+ self.code = code
+ self.msg = msg
+
+ def __str__(self):
+ return repr(self.msg)
+
+
+class StoreClient:
+ """A simple Voldemort client. It is single-threaded and supports only server-side routing."""
+
+ def __init__(self, store_name, bootstrap_urls, reconnect_interval = 500, conflict_resolver = None):
+ self.store_name = store_name
+ self.request_count = 0
+ self.conflict_resolver = conflict_resolver
+ self.nodes, self.stores = self._bootstrap_metadata(bootstrap_urls)
+ self.node_id = random.randint(0, len(self.nodes) - 1)
+ self.node_id, self.connection = self._reconnect()
+ self.reconnect_interval = reconnect_interval
+ self.open = True
+ self.key_serializer = self.stores[store_name].key_serializer
+ self.value_serializer = self.stores[store_name].value_serializer
+
+ def _make_connection(self, host, port):
+ protocol = 'pb0'
+ logging.debug('Attempting to connect to ' + host + ':' + str(port))
+ connection = socket.socket()
+ connection.connect((host, port))
+ logging.debug('Connection succeeded, negotiating protocol')
+ connection.send(protocol)
+ resp = connection.recv(2)
+ if resp != 'ok':
+ raise VoldemortException('Server does not understand the protocol ' + protocol)
+ logging.debug('Protocol negotiation suceeded')
+ return connection
+
+
+ ## Connect to a the next available node in the cluster
+ ## returns a tuple of (node_id, connection)
+ def _reconnect(self):
+ num_nodes = len(self.nodes)
+ attempts = 0
+ new_node_id = self.node_id
+ while attempts < num_nodes:
+ new_node_id = (new_node_id + 1) % num_nodes
+ new_node = self.nodes[new_node_id]
+ connection = None
+ try:
+ connection = self._make_connection(new_node.host, new_node.socket_port)
+ self.request_count = 0
+ return new_node_id, connection
+ except socket.error, (err_num, message):
+ self._close_socket(connection)
+ logging.warn('Error connecting to node ' + str(new_node_id) + ': ' + message)
+ attempts += 1
+
+ # If we get here all nodes have failed us, explode
+ raise VoldemortException('Connections to all nodes failed.')
+
+
+ ## Safely close the socket, catching and logging any exceptions
+ def _close_socket(self, socket):
+ try:
+ if socket:
+ socket.close()
+ except socket.error, exp:
+ logging.error('Error while closing socket: ' + str(exp))
+
+
+ ## Check if the the number of requests made on this connection is greater than the reconnect interval.
+ ## If so reconnect to a random node in the cluster. No attempt is made at preventing the reconnecting
+ ## from going back to the same node
+ def _maybe_reconnect(self):
+ if self.request_count >= self.reconnect_interval:
+ logging.debug('Completed ' + str(self.request_count) + ' requests using this connection, reconnecting...')
+ self._close_socket(self.connection)
+ self.node_id, self.connection = self._reconnect()
+
+
+ ## send a request to the server using the given connection
+ def _send_request(self, connection, req_bytes):
+ connection.send(struct.pack('>i', len(req_bytes)) + req_bytes)
+ self.request_count += 1
+
+
+ ## read a response from the connection
+ def _receive_response(self, connection):
+ size_bytes = connection.recv(4)
+ size = struct.unpack('>i', size_bytes)[0]
+
+ bytes_read = 0
+ data = []
+
+ while size and bytes_read < size:
+ chunk = connection.recv(size - bytes_read)
+ bytes_read += len(chunk)
+ data.append(chunk)
+
+ return ''.join(data)
+
+
+ ## Bootstrap cluster metadata from a list of urls of nodes in the cluster.
+ ## The urls are tuples in the form (host, port).
+ ## A dictionary of node_id => node is returned.
+ def _bootstrap_metadata(self, bootstrap_urls):
+ random.shuffle(bootstrap_urls)
+ for host, port in bootstrap_urls:
+ logging.debug('Attempting to bootstrap metadata from ' + host + ':' + str(port))
+ connection = None
+ try:
+ connection = self._make_connection(host, port)
+ cluster_xmls = self._get_with_connection(connection, 'metadata', 'cluster.xml', should_route = False)
+ if len(cluster_xmls) != 1:
+ raise VoldemortException('Expected exactly one version of the metadata but found ' + str(cluster_xmls))
+ nodes = Node.parse_cluster(cluster_xmls[0][0])
+ logging.debug('Bootstrap from ' + host + ':' + str(port) + ' succeeded, found ' + str(len(nodes)) + " nodes.")
+ stores_xml = self._get_with_connection(connection, 'metadata', 'stores.xml', should_route=False)[0][0]
+ stores = Store.parse_stores_xml(stores_xml)
+
+ return nodes, stores
+ except socket.error, (err_num, message):
+ logging.warn('Metadata bootstrap from ' + host + ':' + str(port) + " failed: " + message)
+ finally:
+ self._close_socket(connection)
+ raise VoldemortException('All bootstrap attempts failed')
+
+
+ ## check if the server response has an error, if so throw an exception
+ def _check_error(self, resp):
+ if resp.error and resp.error.error_code != 0:
+ raise VoldemortException(resp.error.error_message, resp.error.error_code)
+
+
+ ## Increment the version for a vector clock
+ def _increment(self, clock):
+ new_clock = protocol.VectorClock()
+ new_clock.MergeFrom(clock)
+
+ # See if we already have a version for this guy, if so increment it
+ for entry in new_clock.entries:
+ if entry.node_id == self.node_id:
+ entry.version += 1
+ return new_clock
+
+ # Otherwise add a version
+ entry = new_clock.entries.add()
+ entry.node_id = self.node_id
+ entry.version = 1
+ new_clock.timestamp = int(time.time() * 1000)
+
+ return new_clock
+
+ ## Take a list of versions, and, if a conflict resolver has been given, resolve any conflicts that can be resolved
+ def _resolve_conflicts(self, versions):
+ if self.conflict_resolver and versions:
+ return self.conflict_resolver(versions)
+ else:
+ return versions
+
+
+ ## Turn a protocol buffer list of versioned items into a python list of items
+ def _extract_versions(self, pb_versioneds):
+ versions = []
+ for versioned in pb_versioneds:
+ versions.append((versioned.value, versioned.version))
+ return self._resolve_conflicts(versions)
+
+ ## A basic request wrapper, that handles reconnection logic and failures
+ def _execute_request(self, fun, args):
+ assert self.open, 'Store has been closed.'
+ self._maybe_reconnect()
+
+ failures = 0
+ num_nodes = len(self.nodes)
+ while failures < num_nodes:
+ try:
+ return apply(fun, args)
+ except socket.error, (err_num, message):
+ logging.warn('Error while performing ' + fun.__name__ + ' on node ' + str(self.node_id) + ': ' + message)
+ self._reconnect()
+ failures += 1
+ raise VoldemortException('All nodes are down, ' + fun.__name__ + ' failed.')
+
+
+ ## An internal get function that take the connection and store name as parameters. This is
+ ## used by both the public get() method and also the metadata bootstrap process
+ def _get_with_connection(self, connection, store_name, key, should_route):
+ """Execute get request to the given store. Returns a (value, version) pair."""
+
+ req = protocol.VoldemortRequest()
+ req.should_route = should_route
+ req.store = store_name
+ req.type = protocol.GET
+ req.get.key = key
+
+ # send request
+ self._send_request(connection, req.SerializeToString())
+
+ # read and parse response
+ resp_str = self._receive_response(connection)
+ resp = protocol.GetResponse()
+ resp.ParseFromString(resp_str)
+ self._check_error(resp)
+
+ return self._extract_versions(resp.versioned)
+
+
+ ## Inner helper function for get
+ def _get(self, key):
+ return self._get_with_connection(self.connection, self.store_name, key, True)
+
+
+ def get(self, key):
+ """Execute a get request. Returns a list of (value, version) pairs."""
+
+ raw_key = self.key_serializer.writes(key)
+ return [(self.value_serializer.reads(value), version)
+ for value, version in self._execute_request(self._get, [raw_key])]
+
+
+ ## Inner get_all method that takes the connection and store_name as parameters
+ def _get_all(self, keys):
+ req = protocol.VoldemortRequest()
+ req.should_route = True
+ req.store = self.store_name
+ req.type = protocol.GET_ALL
+ for key in keys:
+ req.getAll.keys.append(key)
+
+ # send request
+ self._send_request(self.connection, req.SerializeToString())
+
+ # read and parse response
+ resp_str = self._receive_response(self.connection)
+ resp = protocol.GetAllResponse()
+ resp.ParseFromString(resp_str)
+ self._check_error(resp)
+ values = {}
+ for key_val in resp.values:
+ values[key_val.key] = self._extract_versions(key_val.versions)
+ return values
+
+
+ def get_all(self, keys):
+ """Execute get request for multiple keys given as a list or tuple.
+ Returns a dictionary of key => [(value, version), ...] pairs."""
+
+ raw_keys = [self.key_serializer.writes(key) for key in keys]
+ return dict((self.key_serializer.reads(key), [(self.value_serializer.reads(value), version)
+ for value, version in versioned_values])
+ for key, versioned_values in self._execute_request(self._get_all, [raw_keys]).iteritems())
+
+
+ ## Get the current version of the given key by doing a get request to the store
+ def _fetch_version(self, key):
+ versioned = self.get(key)
+ if versioned:
+ version = versioned[0][1]
+ else:
+ version = protocol.VectorClock()
+ version.timestamp = int(time.time() * 1000)
+ return version
+
+
+ def _put(self, key, value, version):
+ req = protocol.VoldemortRequest()
+ req.should_route = True
+ req.store = self.store_name
+ req.type = protocol.PUT
+ req.put.key = key
+ req.put.versioned.value = value
+ req.put.versioned.version.MergeFrom(version)
+
+ # send request
+ self._send_request(self.connection, req.SerializeToString())
+
+ # read and parse response
+ resp_str = self._receive_response(self.connection)
+ resp = protocol.PutResponse()
+ resp.ParseFromString(resp_str)
+ self._check_error(resp)
+ return self._increment(version)
+
+
+ def put(self, key, value, version = None):
+ """Execute a put request using the given key and value. If no version is specified a get(key) request
+ will be done to get the current version. The updated version is returned."""
+
+ raw_key = self.key_serializer.writes(key)
+ raw_value = self.value_serializer.writes(value)
+
+ # if we don't have a version, fetch one
+ if not version:
+ version = self._fetch_version(key)
+ return self._execute_request(self._put, [raw_key, raw_value, version])
+
+
+ def maybe_put(self, key, value, version = None):
+ """Execute a put request using the given key and value. If the version being put is obsolete,
+ no modification will be made and this function will return None. Otherwise it will return the new version."""
+ try:
+ return self.put(key, value, version)
+ except:
+ return None
+
+ def _delete(self, key, version):
+ req = protocol.VoldemortRequest()
+ req.should_route = True
+ req.store = self.store_name
+ req.type = protocol.DELETE
+ req.delete.key = key
+ req.delete.version.MergeFrom(version)
+
+ # send request
+ self._send_request(self.connection, req.SerializeToString())
+
+ # read and parse response
+ resp_str = self._receive_response(self.connection)
+ resp = protocol.DeleteResponse()
+ resp.ParseFromString(resp_str)
+ self._check_error(resp)
+
+ return resp.success
+
+
+ def delete(self, key, version = None):
+ """Execute a delete request, deleting all keys up to and including the given version.
+ If no version is given a get(key) request will be done to find the latest version."""
+
+ raw_key = self.key_serializer.writes(key)
+
+ # if we don't have a version, fetch one
+ if version == None:
+ version = self._fetch_version(key)
+ return self._execute_request(self._delete, [raw_key, version])
+
+
+ def close(self):
+ """Close the connection this store maintains."""
+ self.open = False
+ self.connection.close()
+
+
+
+
View
0  clients/python/voldemort/protocol/__init__.py
No changes.
View
0  clients/python/slop_pb2.py → clients/python/voldemort/protocol/slop_pb2.py
File renamed without changes
View
130 clients/python/voldemort_admin_pb2.py → .../python/voldemort/protocol/voldemort_admin_pb2.py
@@ -10,7 +10,7 @@
DESCRIPTOR = descriptor.FileDescriptor(
name='voldemort-admin.proto',
package='voldemort',
- serialized_pb='\n\x15voldemort-admin.proto\x12\tvoldemort\x1a\x16voldemort-client.proto\"!\n\x12GetMetadataRequest\x12\x0b\n\x03key\x18\x01 \x02(\x0c\"]\n\x13GetMetadataResponse\x12%\n\x07version\x18\x01 \x01(\x0b\x32\x14.voldemort.Versioned\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"M\n\x15UpdateMetadataRequest\x12\x0b\n\x03key\x18\x01 \x02(\x0c\x12\'\n\tversioned\x18\x02 \x02(\x0b\x32\x14.voldemort.Versioned\"9\n\x16UpdateMetadataResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"7\n\tFileEntry\x12\x11\n\tfile_name\x18\x01 \x02(\t\x12\x17\n\x0f\x66ile_size_bytes\x18\x02 \x02(\x03\"F\n\x0ePartitionEntry\x12\x0b\n\x03key\x18\x01 \x02(\x0c\x12\'\n\tversioned\x18\x02 \x02(\x0b\x32\x14.voldemort.Versioned\"\x8e\x01\n\x1dUpdatePartitionEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\x12\x32\n\x0fpartition_entry\x18\x02 \x02(\x0b\x32\x19.voldemort.PartitionEntry\x12*\n\x06\x66ilter\x18\x03 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\"A\n\x1eUpdatePartitionEntriesResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"-\n\x0fVoldemortFilter\x12\x0c\n\x04name\x18\x01 \x02(\t\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c\"\xaf\x01\n\x18UpdateSlopEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\x12\x0b\n\x03key\x18\x02 \x02(\x0c\x12\'\n\x07version\x18\x03 \x02(\x0b\x32\x16.voldemort.VectorClock\x12,\n\x0crequest_type\x18\x04 \x02(\x0e\x32\x16.voldemort.RequestType\x12\r\n\x05value\x18\x05 \x01(\x0c\x12\x11\n\ttransform\x18\x06 \x01(\x0c\"<\n\x19UpdateSlopEntriesResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"?\n\x1a\x46\x65tchPartitionFilesRequest\x12\x12\n\npartitions\x18\x01 \x03(\x05\x12\r\n\x05store\x18\x02 \x02(\t\"\xa1\x01\n\x1c\x46\x65tchPartitionEntriesRequest\x12\x12\n\npartitions\x18\x01 \x03(\x05\x12\r\n\x05store\x18\x02 \x02(\t\x12*\n\x06\x66ilter\x18\x03 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\x12\x14\n\x0c\x66\x65tch_values\x18\x04 \x01(\x08\x12\x1c\n\x14\x66\x65tch_master_entries\x18\x05 \x01(\x08\"\x81\x01\n\x1d\x46\x65tchPartitionEntriesResponse\x12\x32\n\x0fpartition_entry\x18\x01 \x01(\x0b\x32\x19.voldemort.PartitionEntry\x12\x0b\n\x03key\x18\x02 \x01(\x0c\x12\x1f\n\x05\x65rror\x18\x03 \x01(\x0b\x32\x10.voldemort.Error\"n\n\x1d\x44\x65letePartitionEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\x12\x12\n\npartitions\x18\x02 \x03(\x05\x12*\n\x06\x66ilter\x18\x03 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\"P\n\x1e\x44\x65letePartitionEntriesResponse\x12\r\n\x05\x63ount\x18\x01 \x01(\x05\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"\x94\x01\n\x1dInitiateFetchAndUpdateRequest\x12\x0f\n\x07node_id\x18\x01 \x02(\x05\x12\x12\n\npartitions\x18\x02 \x03(\x05\x12\r\n\x05store\x18\x03 \x02(\t\x12*\n\x06\x66ilter\x18\x04 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\x12\x13\n\x0bis_readonly\x18\x05 \x01(\x08\"1\n\x1b\x41syncOperationStatusRequest\x12\x12\n\nrequest_id\x18\x01 \x02(\x05\"/\n\x19\x41syncOperationStopRequest\x12\x12\n\nrequest_id\x18\x01 \x02(\x05\"=\n\x1a\x41syncOperationStopResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"M\n\x19\x41syncOperationListRequest\x12\x12\n\nrequest_id\x18\x01 \x02(\x05\x12\x1c\n\rshow_complete\x18\x02 \x02(\x08:\x05\x66\x61lse\"R\n\x1a\x41syncOperationListResponse\x12\x13\n\x0brequest_ids\x18\x01 \x03(\x05\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"\xbe\x02\n\x1cInitiateRebalanceNodeRequest\x12\x12\n\nstealer_id\x18\x02 \x02(\x05\x12\x10\n\x08\x64onor_id\x18\x03 \x02(\x05\x12\x12\n\npartitions\x18\x04 \x03(\x05\x12\x0f\n\x07\x61ttempt\x18\x05 \x02(\x05\x12\x18\n\x10\x64\x65letePartitions\x18\x06 \x03(\x05\x12\x18\n\x10unbalanced_store\x18\x07 \x03(\t\x12\x1d\n\x15stealMasterPartitions\x18\x08 \x03(\x05\x12@\n\x17stealer_ro_store_to_dir\x18\t \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\x12>\n\x15\x64onor_ro_store_to_dir\x18\n \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\"\x8a\x01\n\x1c\x41syncOperationStatusResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\x05\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\t\x12\x10\n\x08\x63omplete\x18\x04 \x01(\x08\x12\x1f\n\x05\x65rror\x18\x05 \x01(\x0b\x32\x10.voldemort.Error\"\'\n\x16TruncateEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\":\n\x17TruncateEntriesResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"*\n\x0f\x41\x64\x64StoreRequest\x12\x17\n\x0fstoreDefinition\x18\x01 \x02(\t\"3\n\x10\x41\x64\x64StoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"\'\n\x12\x44\x65leteStoreRequest\x12\x11\n\tstoreName\x18\x01 \x02(\t\"6\n\x13\x44\x65leteStoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"P\n\x11\x46\x65tchStoreRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x11\n\tstore_dir\x18\x02 \x02(\t\x12\x14\n\x0cpush_version\x18\x03 \x01(\x03\"9\n\x10SwapStoreRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x11\n\tstore_dir\x18\x02 \x02(\t\"4\n\x11SwapStoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"@\n\x14RollbackStoreRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x14\n\x0cpush_version\x18\x02 \x02(\x03\"8\n\x15RollbackStoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"=\n\x14ROStoreVersionDirMap\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x11\n\tstore_dir\x18\x02 \x02(\t\"/\n\x19GetROMaxVersionDirRequest\x12\x12\n\nstore_name\x18\x01 \x03(\t\"y\n\x1aGetROMaxVersionDirResponse\x12:\n\x11ro_store_versions\x18\x01 \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"3\n\x1dGetROCurrentVersionDirRequest\x12\x12\n\nstore_name\x18\x01 \x03(\t\"}\n\x1eGetROCurrentVersionDirResponse\x12:\n\x11ro_store_versions\x18\x01 \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"\\\n\x1eSwapStoresAndCleanStateRequest\x12:\n\x11ro_store_versions\x18\x01 \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\"B\n\x1fSwapStoresAndCleanStateResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"\xa5\x0b\n\x15VoldemortAdminRequest\x12)\n\x04type\x18\x01 \x02(\x0e\x32\x1b.voldemort.AdminRequestType\x12\x33\n\x0cget_metadata\x18\x02 \x01(\x0b\x32\x1d.voldemort.GetMetadataRequest\x12\x39\n\x0fupdate_metadata\x18\x03 \x01(\x0b\x32 .voldemort.UpdateMetadataRequest\x12J\n\x18update_partition_entries\x18\x04 \x01(\x0b\x32(.voldemort.UpdatePartitionEntriesRequest\x12H\n\x17\x66\x65tch_partition_entries\x18\x05 \x01(\x0b\x32\'.voldemort.FetchPartitionEntriesRequest\x12J\n\x18\x64\x65lete_partition_entries\x18\x06 \x01(\x0b\x32(.voldemort.DeletePartitionEntriesRequest\x12K\n\x19initiate_fetch_and_update\x18\x07 \x01(\x0b\x32(.voldemort.InitiateFetchAndUpdateRequest\x12\x46\n\x16\x61sync_operation_status\x18\x08 \x01(\x0b\x32&.voldemort.AsyncOperationStatusRequest\x12H\n\x17initiate_rebalance_node\x18\t \x01(\x0b\x32\'.voldemort.InitiateRebalanceNodeRequest\x12\x42\n\x14\x61sync_operation_stop\x18\n \x01(\x0b\x32$.voldemort.AsyncOperationStopRequest\x12\x42\n\x14\x61sync_operation_list\x18\x0b \x01(\x0b\x32$.voldemort.AsyncOperationListRequest\x12;\n\x10truncate_entries\x18\x0c \x01(\x0b\x32!.voldemort.TruncateEntriesRequest\x12-\n\tadd_store\x18\r \x01(\x0b\x32\x1a.voldemort.AddStoreRequest\x12\x33\n\x0c\x64\x65lete_store\x18\x0e \x01(\x0b\x32\x1d.voldemort.DeleteStoreRequest\x12\x31\n\x0b\x66\x65tch_store\x18\x0f \x01(\x0b\x32\x1c.voldemort.FetchStoreRequest\x12/\n\nswap_store\x18\x10 \x01(\x0b\x32\x1b.voldemort.SwapStoreRequest\x12\x37\n\x0erollback_store\x18\x11 \x01(\x0b\x32\x1f.voldemort.RollbackStoreRequest\x12\x44\n\x16get_ro_max_version_dir\x18\x12 \x01(\x0b\x32$.voldemort.GetROMaxVersionDirRequest\x12L\n\x1aget_ro_current_version_dir\x18\x13 \x01(\x0b\x32(.voldemort.GetROCurrentVersionDirRequest\x12\x44\n\x15\x66\x65tch_partition_files\x18\x14 \x01(\x0b\x32%.voldemort.FetchPartitionFilesRequest\x12N\n\x1bswap_stores_and_clean_state\x18\x15 \x01(\x0b\x32).voldemort.SwapStoresAndCleanStateRequest\x12@\n\x13update_slop_entries\x18\x16 \x01(\x0b\x32#.voldemort.UpdateSlopEntriesRequest*\x9b\x04\n\x10\x41\x64minRequestType\x12\x10\n\x0cGET_METADATA\x10\x00\x12\x13\n\x0fUPDATE_METADATA\x10\x01\x12\x1c\n\x18UPDATE_PARTITION_ENTRIES\x10\x02\x12\x1b\n\x17\x46\x45TCH_PARTITION_ENTRIES\x10\x03\x12\x1c\n\x18\x44\x45LETE_PARTITION_ENTRIES\x10\x04\x12\x1d\n\x19INITIATE_FETCH_AND_UPDATE\x10\x05\x12\x1a\n\x16\x41SYNC_OPERATION_STATUS\x10\x06\x12\x1b\n\x17INITIATE_REBALANCE_NODE\x10\x07\x12\x18\n\x14\x41SYNC_OPERATION_STOP\x10\x08\x12\x18\n\x14\x41SYNC_OPERATION_LIST\x10\t\x12\x14\n\x10TRUNCATE_ENTRIES\x10\n\x12\r\n\tADD_STORE\x10\x0b\x12\x10\n\x0c\x44\x45LETE_STORE\x10\x0c\x12\x0f\n\x0b\x46\x45TCH_STORE\x10\r\x12\x0e\n\nSWAP_STORE\x10\x0e\x12\x12\n\x0eROLLBACK_STORE\x10\x0f\x12\x1a\n\x16GET_RO_MAX_VERSION_DIR\x10\x10\x12\x1e\n\x1aGET_RO_CURRENT_VERSION_DIR\x10\x11\x12\x19\n\x15\x46\x45TCH_PARTITION_FILES\x10\x12\x12\x1f\n\x1bSWAP_STORES_AND_CLEAN_STATE\x10\x13\x12\x17\n\x13UPDATE_SLOP_ENTRIES\x10\x14\x42-\n\x1cvoldemort.client.protocol.pbB\x0bVAdminProtoH\x01')
+ serialized_pb='\n\x15voldemort-admin.proto\x12\tvoldemort\x1a\x16voldemort-client.proto\"!\n\x12GetMetadataRequest\x12\x0b\n\x03key\x18\x01 \x02(\x0c\"]\n\x13GetMetadataResponse\x12%\n\x07version\x18\x01 \x01(\x0b\x32\x14.voldemort.Versioned\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"M\n\x15UpdateMetadataRequest\x12\x0b\n\x03key\x18\x01 \x02(\x0c\x12\'\n\tversioned\x18\x02 \x02(\x0b\x32\x14.voldemort.Versioned\"9\n\x16UpdateMetadataResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"7\n\tFileEntry\x12\x11\n\tfile_name\x18\x01 \x02(\t\x12\x17\n\x0f\x66ile_size_bytes\x18\x02 \x02(\x03\"F\n\x0ePartitionEntry\x12\x0b\n\x03key\x18\x01 \x02(\x0c\x12\'\n\tversioned\x18\x02 \x02(\x0b\x32\x14.voldemort.Versioned\"\x8e\x01\n\x1dUpdatePartitionEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\x12\x32\n\x0fpartition_entry\x18\x02 \x02(\x0b\x32\x19.voldemort.PartitionEntry\x12*\n\x06\x66ilter\x18\x03 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\"A\n\x1eUpdatePartitionEntriesResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"-\n\x0fVoldemortFilter\x12\x0c\n\x04name\x18\x01 \x02(\t\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c\"\xaf\x01\n\x18UpdateSlopEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\x12\x0b\n\x03key\x18\x02 \x02(\x0c\x12\'\n\x07version\x18\x03 \x02(\x0b\x32\x16.voldemort.VectorClock\x12,\n\x0crequest_type\x18\x04 \x02(\x0e\x32\x16.voldemort.RequestType\x12\r\n\x05value\x18\x05 \x01(\x0c\x12\x11\n\ttransform\x18\x06 \x01(\x0c\"<\n\x19UpdateSlopEntriesResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"?\n\x1a\x46\x65tchPartitionFilesRequest\x12\x12\n\npartitions\x18\x01 \x03(\x05\x12\r\n\x05store\x18\x02 \x02(\t\"\xb7\x01\n\x1c\x46\x65tchPartitionEntriesRequest\x12\x12\n\npartitions\x18\x01 \x03(\x05\x12\r\n\x05store\x18\x02 \x02(\t\x12*\n\x06\x66ilter\x18\x03 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\x12\x14\n\x0c\x66\x65tch_values\x18\x04 \x01(\x08\x12\x1c\n\x14\x66\x65tch_master_entries\x18\x05 \x01(\x08\x12\x14\n\x0cskip_records\x18\x06 \x01(\x03\"\x81\x01\n\x1d\x46\x65tchPartitionEntriesResponse\x12\x32\n\x0fpartition_entry\x18\x01 \x01(\x0b\x32\x19.voldemort.PartitionEntry\x12\x0b\n\x03key\x18\x02 \x01(\x0c\x12\x1f\n\x05\x65rror\x18\x03 \x01(\x0b\x32\x10.voldemort.Error\"n\n\x1d\x44\x65letePartitionEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\x12\x12\n\npartitions\x18\x02 \x03(\x05\x12*\n\x06\x66ilter\x18\x03 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\"P\n\x1e\x44\x65letePartitionEntriesResponse\x12\r\n\x05\x63ount\x18\x01 \x01(\x05\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"\x94\x01\n\x1dInitiateFetchAndUpdateRequest\x12\x0f\n\x07node_id\x18\x01 \x02(\x05\x12\x12\n\npartitions\x18\x02 \x03(\x05\x12\r\n\x05store\x18\x03 \x02(\t\x12*\n\x06\x66ilter\x18\x04 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\x12\x13\n\x0bis_readonly\x18\x05 \x01(\x08\"1\n\x1b\x41syncOperationStatusRequest\x12\x12\n\nrequest_id\x18\x01 \x02(\x05\"/\n\x19\x41syncOperationStopRequest\x12\x12\n\nrequest_id\x18\x01 \x02(\x05\"=\n\x1a\x41syncOperationStopResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"M\n\x19\x41syncOperationListRequest\x12\x12\n\nrequest_id\x18\x01 \x02(\x05\x12\x1c\n\rshow_complete\x18\x02 \x02(\x08:\x05\x66\x61lse\"R\n\x1a\x41syncOperationListResponse\x12\x13\n\x0brequest_ids\x18\x01 \x03(\x05\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"\xbe\x02\n\x1cInitiateRebalanceNodeRequest\x12\x12\n\nstealer_id\x18\x02 \x02(\x05\x12\x10\n\x08\x64onor_id\x18\x03 \x02(\x05\x12\x12\n\npartitions\x18\x04 \x03(\x05\x12\x0f\n\x07\x61ttempt\x18\x05 \x02(\x05\x12\x18\n\x10\x64\x65letePartitions\x18\x06 \x03(\x05\x12\x18\n\x10unbalanced_store\x18\x07 \x03(\t\x12\x1d\n\x15stealMasterPartitions\x18\x08 \x03(\x05\x12@\n\x17stealer_ro_store_to_dir\x18\t \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\x12>\n\x15\x64onor_ro_store_to_dir\x18\n \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\"\x8a\x01\n\x1c\x41syncOperationStatusResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\x05\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\t\x12\x10\n\x08\x63omplete\x18\x04 \x01(\x08\x12\x1f\n\x05\x65rror\x18\x05 \x01(\x0b\x32\x10.voldemort.Error\"\'\n\x16TruncateEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\":\n\x17TruncateEntriesResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"*\n\x0f\x41\x64\x64StoreRequest\x12\x17\n\x0fstoreDefinition\x18\x01 \x02(\t\"3\n\x10\x41\x64\x64StoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"\'\n\x12\x44\x65leteStoreRequest\x12\x11\n\tstoreName\x18\x01 \x02(\t\"6\n\x13\x44\x65leteStoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"P\n\x11\x46\x65tchStoreRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x11\n\tstore_dir\x18\x02 \x02(\t\x12\x14\n\x0cpush_version\x18\x03 \x01(\x03\"9\n\x10SwapStoreRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x11\n\tstore_dir\x18\x02 \x02(\t\"4\n\x11SwapStoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"@\n\x14RollbackStoreRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x14\n\x0cpush_version\x18\x02 \x02(\x03\"8\n\x15RollbackStoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"=\n\x14ROStoreVersionDirMap\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x11\n\tstore_dir\x18\x02 \x02(\t\"/\n\x19GetROMaxVersionDirRequest\x12\x12\n\nstore_name\x18\x01 \x03(\t\"y\n\x1aGetROMaxVersionDirResponse\x12:\n\x11ro_store_versions\x18\x01 \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"3\n\x1dGetROCurrentVersionDirRequest\x12\x12\n\nstore_name\x18\x01 \x03(\t\"}\n\x1eGetROCurrentVersionDirResponse\x12:\n\x11ro_store_versions\x18\x01 \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"\\\n\x1eSwapStoresAndCleanStateRequest\x12:\n\x11ro_store_versions\x18\x01 \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\"B\n\x1fSwapStoresAndCleanStateResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"\xa5\x0b\n\x15VoldemortAdminRequest\x12)\n\x04type\x18\x01 \x02(\x0e\x32\x1b.voldemort.AdminRequestType\x12\x33\n\x0cget_metadata\x18\x02 \x01(\x0b\x32\x1d.voldemort.GetMetadataRequest\x12\x39\n\x0fupdate_metadata\x18\x03 \x01(\x0b\x32 .voldemort.UpdateMetadataRequest\x12J\n\x18update_partition_entries\x18\x04 \x01(\x0b\x32(.voldemort.UpdatePartitionEntriesRequest\x12H\n\x17\x66\x65tch_partition_entries\x18\x05 \x01(\x0b\x32\'.voldemort.FetchPartitionEntriesRequest\x12J\n\x18\x64\x65lete_partition_entries\x18\x06 \x01(\x0b\x32(.voldemort.DeletePartitionEntriesRequest\x12K\n\x19initiate_fetch_and_update\x18\x07 \x01(\x0b\x32(.voldemort.InitiateFetchAndUpdateRequest\x12\x46\n\x16\x61sync_operation_status\x18\x08 \x01(\x0b\x32&.voldemort.AsyncOperationStatusRequest\x12H\n\x17initiate_rebalance_node\x18\t \x01(\x0b\x32\'.voldemort.InitiateRebalanceNodeRequest\x12\x42\n\x14\x61sync_operation_stop\x18\n \x01(\x0b\x32$.voldemort.AsyncOperationStopRequest\x12\x42\n\x14\x61sync_operation_list\x18\x0b \x01(\x0b\x32$.voldemort.AsyncOperationListRequest\x12;\n\x10truncate_entries\x18\x0c \x01(\x0b\x32!.voldemort.TruncateEntriesRequest\x12-\n\tadd_store\x18\r \x01(\x0b\x32\x1a.voldemort.AddStoreRequest\x12\x33\n\x0c\x64\x65lete_store\x18\x0e \x01(\x0b\x32\x1d.voldemort.DeleteStoreRequest\x12\x31\n\x0b\x66\x65tch_store\x18\x0f \x01(\x0b\x32\x1c.voldemort.FetchStoreRequest\x12/\n\nswap_store\x18\x10 \x01(\x0b\x32\x1b.voldemort.SwapStoreRequest\x12\x37\n\x0erollback_store\x18\x11 \x01(\x0b\x32\x1f.voldemort.RollbackStoreRequest\x12\x44\n\x16get_ro_max_version_dir\x18\x12 \x01(\x0b\x32$.voldemort.GetROMaxVersionDirRequest\x12L\n\x1aget_ro_current_version_dir\x18\x13 \x01(\x0b\x32(.voldemort.GetROCurrentVersionDirRequest\x12\x44\n\x15\x66\x65tch_partition_files\x18\x14 \x01(\x0b\x32%.voldemort.FetchPartitionFilesRequest\x12N\n\x1bswap_stores_and_clean_state\x18\x15 \x01(\x0b\x32).voldemort.SwapStoresAndCleanStateRequest\x12@\n\x13update_slop_entries\x18\x16 \x01(\x0b\x32#.voldemort.UpdateSlopEntriesRequest*\x9b\x04\n\x10\x41\x64minRequestType\x12\x10\n\x0cGET_METADATA\x10\x00\x12\x13\n\x0fUPDATE_METADATA\x10\x01\x12\x1c\n\x18UPDATE_PARTITION_ENTRIES\x10\x02\x12\x1b\n\x17\x46\x45TCH_PARTITION_ENTRIES\x10\x03\x12\x1c\n\x18\x44\x45LETE_PARTITION_ENTRIES\x10\x04\x12\x1d\n\x19INITIATE_FETCH_AND_UPDATE\x10\x05\x12\x1a\n\x16\x41SYNC_OPERATION_STATUS\x10\x06\x12\x1b\n\x17INITIATE_REBALANCE_NODE\x10\x07\x12\x18\n\x14\x41SYNC_OPERATION_STOP\x10\x08\x12\x18\n\x14\x41SYNC_OPERATION_LIST\x10\t\x12\x14\n\x10TRUNCATE_ENTRIES\x10\n\x12\r\n\tADD_STORE\x10\x0b\x12\x10\n\x0c\x44\x45LETE_STORE\x10\x0c\x12\x0f\n\x0b\x46\x45TCH_STORE\x10\r\x12\x0e\n\nSWAP_STORE\x10\x0e\x12\x12\n\x0eROLLBACK_STORE\x10\x0f\x12\x1a\n\x16GET_RO_MAX_VERSION_DIR\x10\x10\x12\x1e\n\x1aGET_RO_CURRENT_VERSION_DIR\x10\x11\x12\x19\n\x15\x46\x45TCH_PARTITION_FILES\x10\x12\x12\x1f\n\x1bSWAP_STORES_AND_CLEAN_STATE\x10\x13\x12\x17\n\x13UPDATE_SLOP_ENTRIES\x10\x14\x42-\n\x1cvoldemort.client.protocol.pbB\x0bVAdminProtoH\x01')
_ADMINREQUESTTYPE = descriptor.EnumDescriptor(
name='AdminRequestType',
@@ -105,8 +105,8 @@
],
containing_type=None,
options=None,
- serialized_start=5090,
- serialized_end=5629,
+ serialized_start=5112,
+ serialized_end=5651,
)
@@ -606,7 +606,7 @@
descriptor.FieldDescriptor(
name='skip_records', full_name='voldemort.FetchPartitionEntriesRequest.skip_records', index=5,
number=6, type=3, cpp_type=2, label=1,
- default_value=0,
+ has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
@@ -620,7 +620,7 @@
is_extendable=False,
extension_ranges=[],
serialized_start=1022,
- serialized_end=1183,
+ serialized_end=1205,
)
@@ -661,8 +661,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=1186,
- serialized_end=1315,
+ serialized_start=1208,
+ serialized_end=1337,
)
@@ -703,8 +703,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=1317,
- serialized_end=1427,
+ serialized_start=1339,
+ serialized_end=1449,
)
@@ -738,8 +738,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=1429,
- serialized_end=1509,
+ serialized_start=1451,
+ serialized_end=1531,
)
@@ -794,8 +794,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=1512,
- serialized_end=1660,
+ serialized_start=1534,
+ serialized_end=1682,
)
@@ -822,8 +822,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=1662,
- serialized_end=1711,
+ serialized_start=1684,
+ serialized_end=1733,
)
@@ -850,8 +850,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=1713,
- serialized_end=1760,
+ serialized_start=1735,
+ serialized_end=1782,
)
@@ -878,8 +878,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=1762,
- serialized_end=1823,
+ serialized_start=1784,
+ serialized_end=1845,
)
@@ -913,8 +913,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=1825,
- serialized_end=1902,
+ serialized_start=1847,
+ serialized_end=1924,
)
@@ -948,8 +948,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=1904,
- serialized_end=1986,
+ serialized_start=1926,
+ serialized_end=2008,
)
@@ -1032,8 +1032,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=1989,
- serialized_end=2307,
+ serialized_start=2011,
+ serialized_end=2329,
)
@@ -1088,8 +1088,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=2310,
- serialized_end=2448,
+ serialized_start=2332,
+ serialized_end=2470,
)
@@ -1116,8 +1116,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=2450,
- serialized_end=2489,
+ serialized_start=2472,
+ serialized_end=2511,
)
@@ -1144,8 +1144,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=2491,
- serialized_end=2549,
+ serialized_start=2513,
+ serialized_end=2571,
)
@@ -1172,8 +1172,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=2551,
- serialized_end=2593,
+ serialized_start=2573,
+ serialized_end=2615,
)
@@ -1200,8 +1200,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=2595,
- serialized_end=2646,
+ serialized_start=2617,
+ serialized_end=2668,
)
@@ -1228,8 +1228,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=2648,
- serialized_end=2687,
+ serialized_start=2670,
+ serialized_end=2709,
)
@@ -1256,8 +1256,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=2689,
- serialized_end=2743,
+ serialized_start=2711,
+ serialized_end=2765,
)
@@ -1298,8 +1298,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=2745,
- serialized_end=2825,
+ serialized_start=2767,
+ serialized_end=2847,
)
@@ -1333,8 +1333,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=2827,
- serialized_end=2884,
+ serialized_start=2849,
+ serialized_end=2906,
)
@@ -1361,8 +1361,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=2886,
- serialized_end=2938,
+ serialized_start=2908,
+ serialized_end=2960,
)
@@ -1396,8 +1396,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=2940,
- serialized_end=3004,
+ serialized_start=2962,
+ serialized_end=3026,
)
@@ -1424,8 +1424,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=3006,
- serialized_end=3062,
+ serialized_start=3028,
+ serialized_end=3084,
)
@@ -1459,8 +1459,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=3064,
- serialized_end=3125,
+ serialized_start=3086,
+ serialized_end=3147,
)
@@ -1487,8 +1487,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=3127,
- serialized_end=3174,
+ serialized_start=3149,
+ serialized_end=3196,
)
@@ -1522,8 +1522,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=3176,
- serialized_end=3297,
+ serialized_start=3198,
+ serialized_end=3319,
)
@@ -1550,8 +1550,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=3299,
- serialized_end=3350,
+ serialized_start=3321,
+ serialized_end=3372,
)
@@ -1585,8 +1585,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=3352,
- serialized_end=3477,
+ serialized_start=3374,
+ serialized_end=3499,
)
@@ -1613,8 +1613,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=3479,
- serialized_end=3571,
+ serialized_start=3501,
+ serialized_end=3593,
)
@@ -1641,8 +1641,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=3573,
- serialized_end=3639,
+ serialized_start=3595,
+ serialized_end=3661,
)
@@ -1816,8 +1816,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=3642,
- serialized_end=5087,
+ serialized_start=3664,
+ serialized_end=5109,
)
import voldemort_client_pb2
View
0  clients/python/voldemort_client_pb2.py → ...python/voldemort/protocol/voldemort_client_pb2.py
File renamed without changes
View
8 clients/python/voldemort/serialization/__init__.py
@@ -0,0 +1,8 @@
+from common import SerializationException
+from json_serializer import JsonTypeSerializer
+from string_serializer import StringSerializer
+
+SERIALIZER_CLASSES = {
+ "string": StringSerializer,
+ "json": JsonTypeSerializer,
+}
View
3  clients/python/voldemort/serialization/common.py
@@ -0,0 +1,3 @@
+class SerializationException(Exception):
+ pass
+
View
1,252 clients/python/voldemort/serialization/json_serializer.py
@@ -0,0 +1,1252 @@
+from common import *
+
+import cStringIO as StringIO
+import datetime
+import struct
+import simplejson
+from xml.dom import minidom
+
+# get the real OrderedDict if we're on 2.7
+try:
+ from collections import OrderedDict
+except ImportError, e:
+ from ordered_dict import OrderedDict
+
+
+TYPES = {
+ "int8": (int, long),
+ "int16": (int, long),
+ "int32": (int, long),
+ "int64": (int, long),
+ "float32": (float,),
+ "float64": (float,),
+ "boolean": (bool,),
+ "string": (str, unicode),
+ "bytes": (str,),
+ "date": (datetime.datetime,),
+}
+
+MAXES = {
+ "int8": (1 << 7) - 1,
+ "int16": (1 << 15) - 1,
+ "int32": (1 << 31) - 1,
+ "int64": (1 << 63) - 1,
+ "float32": struct.unpack(">f", "\x7f\x7f\xff\xff")[0],
+ "float64": struct.unpack(">d", "\x7f\xef\xff\xff\xff\xff\xff\xff")[0],
+}
+
+MINS = {
+ "int8": -(1 << 7),
+ "int16": -(1 << 15),
+ "int32": -(1 << 31),
+ "int64": -(1 << 63),
+ "float32": struct.unpack(">f", "\x00\x00\x00\x01")[0],
+ "float64": struct.unpack(">d", "\x00\x00\x00\x00\x00\x00\x00\x01")[0],
+}
+
+def _is_valid_float(f, typedef):
+ return abs(f) <= MAXES[typedef] and (f == 0.0 or abs(f) > MINS[typedef])
+
+def _is_valid_int(i, typedef):
+ return i > MINS[typedef] and i <= MAXES[typedef]
+
+RANGE_FNS = {
+ "int8": _is_valid_int,
+ "int16": _is_valid_int,
+ "int32": _is_valid_int,
+ "int64": _is_valid_int,
+ "float32": _is_valid_float,
+ "float64": _is_valid_float,
+}
+
+FORMATS = {
+ "int8": ">b",
+ "int16": ">h",
+ "int32": ">i",
+ "int64": ">q",
+ "float32": ">f",
+ "float64": ">d",
+}
+
+MAX_SEQ_LENGTH = 0x3FFFFFFF
+
+EPOCH = datetime.datetime(1970, 1, 1, 0, 0, 0, 0)
+def _to_java_date(date):
+ """
+ Converts a python datetime into an int64 representing milliseconds since
+ 00:00:00 1/1/1970 GMT. This is the expected input to the Java Date class.
+ Microseconds in the Python datetime representation are truncated.
+
+ >>> d = datetime.datetime(2010, 11, 24, 11, 50, 34, 237861)
+ >>> _to_java_date(d)
+ 1290599434237
+
+ >>> _from_java_date(_to_java_date(d))
+ datetime.datetime(2010, 11, 24, 11, 50, 34, 237000)
+ """
+
+ td = date - EPOCH
+ ms = td.days * 24 * 3600 * 1000 + td.seconds * 1000 + td.microseconds / 1000
+ return ms
+
+
+def _from_java_date(javaDate):
+ """
+ Converts an int64 representing a Java Date as milliseconds since
+ 00:00:00 1/1/1970 GMT into a Python datetime.
+
+ >>> _from_java_date(1000000000000)
+ datetime.datetime(2001, 9, 9, 1, 46, 40)
+
+ >>> _to_java_date(_from_java_date(1000000000000))
+ 1000000000000
+ """
+
+ td = datetime.timedelta(microseconds = javaDate * 1000)
+ return EPOCH + td
+
+
+class JsonTypeSerializer(object):
+ """
+ Python implementation of the Voldemort JsonTypeSerializer class, which converts structured
+ types consisting of lists, dicts, and primitive types (strings, ints, floats, dates) into
+ the proprietary binary representation used by Voldemort.
+ """
+
+ def __init__(self, typedef, has_version=False):
+ r"""
+ Constructor. The typedef is either a json string containing the schema definition or a
+ map from integer version number to schema json.
+
+ A simple json schema results in a non-versioned serializer (note that the output is
+ only the 4-byte binary value WITHOUT a version number prefix:
+
+ >>> s = JsonTypeSerializer('"int32"')
+ >>> s.writes(42)
+ '\x00\x00\x00*'
+
+ A dict typedef will always yield a versioned serializer (note that now there is a version
+ number byte appended to the output:
+
+ >>> s = JsonTypeSerializer({1: '"int32"'})
+ >>> s.writes(42)
+ '\x01\x00\x00\x00*'
+
+ Setting has_version=True will also return a versioned serializer, with an implied version 0:
+
+ >>> s = JsonTypeSerializer('"int32"', has_version=True)
+ >>> s.writes(42)
+ '\x00\x00\x00\x00*'
+
+ """
+
+ self._has_version = has_version or isinstance(typedef, dict)
+
+ if not isinstance(typedef, dict):
+ typeobj = simplejson.loads(typedef, object_pairs_hook=OrderedDict)
+ if self._has_version:
+ self._typedef = dict([(0, typeobj)])
+ else:
+ self._typedef = typeobj
+ else:
+ self._typedef = dict((k, simplejson.loads(v, object_pairs_hook=OrderedDict))
+ for k, v in typedef.iteritems())
+
+
+ @staticmethod
+ def create_from_xml(node):
+ r"""
+ Static factory method that creates a serializer from then XML description in a
+ stores.xml file.
+
+ >>> from xml.dom import minidom
+ >>> xml = minidom.parseString(
+ ... '<serializer><type>json</type><schema-info version="0">"int32"</schema-info></serializer>')
+ >>> s = JsonTypeSerializer.create_from_xml(xml)
+ >>> s.writes(42)
+ '\x00\x00\x00\x00*'
+
+ Output always uses the latest version:
+ >>> xml = minidom.parseString(
+ ... '<serializer><type>json</type>' +
+ ... '<schema-info version="0">"int32"</schema-info>' +
+ ... '<schema-info version="1">"int64"</schema-info></serializer>')
+ >>> s = JsonTypeSerializer.create_from_xml(xml)
+ >>> s.writes(42)
+ '\x01\x00\x00\x00\x00\x00\x00\x00*'
+
+ Duplicate version numbers are an error:
+ >>> xml = minidom.parseString(
+ ... '<serializer><type>json</type>' +
+ ... '<schema-info version="0">"int32"</schema-info>' +
+ ... '<schema-info version="0">"int64"</schema-info></serializer>')
+ >>> s = JsonTypeSerializer.create_from_xml(xml)
+ Traceback (most recent call last):
+ ...
+ SerializationException: Schema info has duplicates of version: 0
+
+ The version "none" means no versioning, and no version will be output in the byte stream:
+ >>> xml = minidom.parseString(
+ ... '<serializer><type>json</type><schema-info version="none">"int32"</schema-info></serializer>')
+ >>> s = JsonTypeSerializer.create_from_xml(xml)
+ >>> s.writes(42)
+ '\x00\x00\x00*'
+
+ You can't mix a "none" version with a regular version number:
+ >>> xml = minidom.parseString(
+ ... '<serializer><type>json</type>' +
+ ... '<schema-info version="none">"int32"</schema-info>' +
+ ... '<schema-info version="0">"int64"</schema-info></serializer>')
+ >>> s = JsonTypeSerializer.create_from_xml(xml)
+ Traceback (most recent call last):
+ ...
+ SerializationException: Schema info has duplicates of version: 0
+ """
+
+ typedef = dict()
+
+ has_version = True
+ for schema_info in node.getElementsByTagName('schema-info'):
+ version = schema_info.getAttribute('version')
+ if not version:
+ version = 0
+ elif version == 'none':
+ version = 0
+ has_version = False
+ else:
+ version = int(version)
+
+ if version in typedef:
+ raise SerializationException('Schema info has duplicates of version: %d' % version)
+
+ typedef[version] = ''.join(elem.data for elem in schema_info.childNodes
+ if elem.nodeType == minidom.Node.TEXT_NODE)
+
+ if not typedef:
+ raise SerializationException('No schemas specified')
+
+ if not has_version and len(typedef) > 1:
+ raise SerializationException('Schema info has version="none" and multiple versions')
+
+ if not has_version:
+ return JsonTypeSerializer(typedef[0])
+ else:
+ return JsonTypeSerializer(typedef)
+
+
+ def read(self, input):
+ r"""
+ Reads a serialized object from the file-like object input:
+
+ >>> f = StringIO.StringIO('\x00\x00\x00*')
+ >>> s = JsonTypeSerializer('"int32"')
+ >>> s.read(f)
+ 42
+
+ >>> versioned = '\00\x01>\x80\x00\x00\x00\x03\x00\x01\x00\x02\x00\x03\x00\x03foo'
+ >>> non_versioned = '\x01>\x80\x00\x00\x00\x03\x00\x01\x00\x02\x00\x03\x00\x03foo'
+
+ More complex types are also supported:
+ >>> s = JsonTypeSerializer('{ "a":"float32", "b":["int16"], "c":"string" }')
+ >>> f = StringIO.StringIO(non_versioned)
+ >>> s.read(f) == {'a': 0.25, 'b': [1, 2, 3], 'c':u'foo'}
+ True
+
+ Non-versioned serializers can't read versioned binary representations:
+
+ >>> f = StringIO.StringIO(versioned)
+ >>> s.read(f)
+ Traceback (most recent call last):
+ ...
+ SerializationException: Unexpected end of input.
+
+ And vice-version, versioned serializers will only read versioned binary representations:
+
+ >>> s = JsonTypeSerializer('{ "a":"float32", "b":["int16"], "c":"string" }', has_version=True)
+ >>> f = StringIO.StringIO(versioned)
+ >>> s.read(f) == {'a': 0.25, 'b': [1, 2, 3], 'c': u'foo'}
+ True
+
+ >>> f = StringIO.StringIO(non_versioned)
+ >>> s.read(f)
+ Traceback (most recent call last):
+ ...
+ KeyError: 1
+
+ The error messages from reading improperly versioned representations aren't super helpful...
+ """
+
+ if self._has_version:
+ version = self._read_int8(input)
+ typedef = self._typedef[version]
+ else:
+ typedef = self._typedef
+
+ return self._read(input, typedef)
+
+ def reads(self, s):
+ r"""
+ Reads a serialized object from given string:
+
+ >>> s = JsonTypeSerializer('"int32"')
+ >>> s.reads('\x00\x00\x00*')
+ 42
+
+ Same rules as read()/write() with regard to versioned representations apply to reads()/writes():
+
+ >>> versioned = '\00\x01>\x80\x00\x00\x00\x03\x00\x01\x00\x02\x00\x03\x00\x03foo'
+ >>> non_versioned = '\x01>\x80\x00\x00\x00\x03\x00\x01\x00\x02\x00\x03\x00\x03foo'