Skip to content

Commit

Permalink
Removing classproperty and updating docs after the Sphinx pointer
Browse files Browse the repository at this point in the history
  • Loading branch information
mtai committed Aug 23, 2011
1 parent 943894b commit 8a28e81
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 37 deletions.
1 change: 0 additions & 1 deletion docs/_build/.gitignore

This file was deleted.

32 changes: 30 additions & 2 deletions docs/protocols.rst
Expand Up @@ -4,10 +4,38 @@ mrjob.protocol - input and output
.. automodule:: mrjob.protocol .. automodule:: mrjob.protocol


.. autodata:: DEFAULT_PROTOCOL .. autodata:: DEFAULT_PROTOCOL
.. autoclass:: ProtocolRegistar .. autoclass:: ProtocolRegistrar

----

Abstract base classes


.. autoclass:: HadoopStreamingProtocol .. autoclass:: HadoopStreamingProtocol
:members: :members: __init__, read, write

.. autoclass:: TabSplitProtocol
:show-inheritance:
:members: load_from_string, dump_to_string
.. autoclass:: ValueOnlyProtocol
:show-inheritance:
:members: load_from_string, dump_to_string

----

Implemented protocols, available via :py:attr:`ProtocolRegistrar.mapping`

============ ===============================
name class
============ ===============================
json :py:class:`JSONProtocol`
json_value :py:class:`JSONValueProtocol`
pickle :py:class:`PickleProtocol`
pickle_value :py:class:`PickleValueProtocol`
raw_value :py:class:`RawValueProtocol`
repr :py:class:`ReprProtocol`
repr_value :py:class:`ReprValueProtocol`
============ ===============================

.. autoclass:: JSONProtocol .. autoclass:: JSONProtocol
.. autoclass:: JSONValueProtocol .. autoclass:: JSONValueProtocol
.. autoclass:: PickleProtocol .. autoclass:: PickleProtocol
Expand Down
22 changes: 11 additions & 11 deletions mrjob/job.py
Expand Up @@ -110,7 +110,7 @@ def reducer(self, word, occurrences):
from mrjob.conf import combine_dicts from mrjob.conf import combine_dicts
from mrjob.parse import parse_mr_job_stderr, check_kv_pair, check_range_list from mrjob.parse import parse_mr_job_stderr, check_kv_pair, check_range_list
from mrjob.runner import CLEANUP_CHOICES, CLEANUP_DEFAULT from mrjob.runner import CLEANUP_CHOICES, CLEANUP_DEFAULT
from mrjob.util import log_to_stream, read_input, classproperty from mrjob.util import log_to_stream, read_input


# used by mr() below, to fake no mapper # used by mr() below, to fake no mapper
def _IDENTITY_MAPPER(key, value): def _IDENTITY_MAPPER(key, value):
Expand Down Expand Up @@ -589,15 +589,15 @@ def pick_protocols(self, step_num, step_type):
else: else:
read_protocol = self.options.protocol read_protocol = self.options.protocol


read_protocol_cls = self.protocols[read_protocol] read_protocol_cls = self.protocols()[read_protocol]
read = read_protocol_cls(step_type).read read = read_protocol_cls(step_type).read


if step_num == len(steps_desc) - 1 and step_type == steps_desc[-1][-1]: if step_num == len(steps_desc) - 1 and step_type == steps_desc[-1][-1]:
write_protocol = self.options.output_protocol write_protocol = self.options.output_protocol
else: else:
write_protocol = self.options.protocol write_protocol = self.options.protocol


write_protocol_cls = self.protocols[write_protocol] write_protocol_cls = self.protocols()[write_protocol]
write = write_protocol_cls(step_type).write write = write_protocol_cls(step_type).write


return read, write return read, write
Expand Down Expand Up @@ -641,7 +641,7 @@ def configure_options(self):
help='which step to execute (default is 0)') help='which step to execute (default is 0)')


# protocol stuff # protocol stuff
protocol_choices = tuple(sorted(self.protocols)) protocol_choices = tuple(sorted(self.protocols()))
self.proto_opt_group = OptionGroup( self.proto_opt_group = OptionGroup(
self.option_parser, 'Protocols') self.option_parser, 'Protocols')
self.option_parser.add_option_group(self.proto_opt_group) self.option_parser.add_option_group(self.proto_opt_group)
Expand Down Expand Up @@ -1210,19 +1210,19 @@ def generate_file_upload_args(self):


### protocols ### ### protocols ###


@classproperty @classmethod
def protocols(cls): def protocols(cls):
"""Mapping from protocol name to the protocol class to use """Mapping from protocol name to the protocol class to use
for parsing job input and writing job output. We give protocols names for parsing job input and writing job output. We give protocols names
so that we can easily choose them from the command line. so that we can easily choose them from the command line.
This returns :py:data:`mrjob.protocol.ProtocolRegistrar.mapping` by default. Returns :py:data:`mrjob.protocol.ProtocolRegistrar.mapping()` by default.
To add a custom protocol, define a subclass of To add a custom protocol, define a subclass of
:py:class:`mrjob.protocol.HadoopStreamingProtocol`, and :py:class:`mrjob.protocol.HadoopStreamingProtocol` and make sure it gets imported.
then ProtocolRegistrar will pick up your new mapping ProtocolRegistrar will automatically pick up your new protocol
""" """
return mrjob.protocol.ProtocolRegistrar.mapping # copy to stop monkey-patching return mrjob.protocol.ProtocolRegistrar.mapping() # copy to stop monkey-patching


#: Default protocol for reading input to the first mapper in your job. #: Default protocol for reading input to the first mapper in your job.
#: Default: ``'raw_value'``. #: Default: ``'raw_value'``.
Expand Down Expand Up @@ -1272,7 +1272,7 @@ def parse_output_line(self, line):
for line in runner.stream_output(): for line in runner.stream_output():
key, value = mr_job.parse_output_line(line) key, value = mr_job.parse_output_line(line)
""" """
reader = self.protocols[self.options.output_protocol]() reader = self.protocols()[self.options.output_protocol]()
return reader.read(line) return reader.read(line)


### Testing ### ### Testing ###
Expand Down Expand Up @@ -1360,7 +1360,7 @@ def parse_output(self, protocol=DEFAULT_PROTOCOL):
if self.stdout == sys.stdout: if self.stdout == sys.stdout:
raise AssertionError('You must call sandbox() first; parse_output() is for testing only.') raise AssertionError('You must call sandbox() first; parse_output() is for testing only.')


reader_cls = self.protocols[protocol] reader_cls = self.protocols()[protocol]
reader = reader_cls() reader = reader_cls()
lines = StringIO(self.stdout.getvalue()) lines = StringIO(self.stdout.getvalue())
return [reader.read(line) for line in lines] return [reader.read(line) for line in lines]
Expand Down
54 changes: 39 additions & 15 deletions mrjob/protocol.py
Expand Up @@ -30,31 +30,20 @@
# since MRJobs need to run in Amazon's generic EMR environment # since MRJobs need to run in Amazon's generic EMR environment
import cPickle import cPickle


from mrjob.util import classproperty
from mrjob.util import safeeval from mrjob.util import safeeval


try: try:
import simplejson as json # preferred because of C speedups import simplejson as json # preferred because of C speedups
except ImportError: except ImportError:
import json # built in to Python 2.6 and later import json # built in to Python 2.6 and later


#: Default mapping from protocol name to class:
#:
#: ============ ===============================
#: name class
#: ============ ===============================
#: json :py:class:`JSONProtocol`
#: json_value :py:class:`JSONValueProtocol`
#: pickle :py:class:`PickleProtocol`
#: pickle_value :py:class:`PickleValueProtocol`
#: raw_value :py:class:`RawValueProtocol`
#: repr :py:class:`ReprProtocol`
#: repr_value :py:class:`ReprValueProtocol`
#: ============ ===============================
class ProtocolRegistrar(type): class ProtocolRegistrar(type):
"""Central registry for all declared HadoopStreamingProtocols"""
_name_to_class_map = {} _name_to_class_map = {}


def __new__(mcs, name, bases, attrs): def __new__(mcs, name, bases, attrs):
"""For every new Protocol declaration, register the class here"""
new_cls = super(ProtocolRegistrar, mcs).__new__(mcs, name, bases, attrs) new_cls = super(ProtocolRegistrar, mcs).__new__(mcs, name, bases, attrs)


mapping_name = new_cls.name mapping_name = new_cls.name
Expand All @@ -66,7 +55,7 @@ def __new__(mcs, name, bases, attrs):
mcs._name_to_class_map[mapping_name] = new_cls mcs._name_to_class_map[mapping_name] = new_cls
return new_cls return new_cls


@classproperty @classmethod
def mapping(mcs): def mapping(mcs):
return mcs._name_to_class_map return mcs._name_to_class_map


Expand All @@ -78,6 +67,11 @@ class HadoopStreamingProtocol(object):
name = None name = None


def __init__(self, step_type=None): def __init__(self, step_type=None):
"""Initialize a new protocol.
:type step_type: str
:param step_type: 'M' / 'R' / None - Mapper / Reducer / Unknown
"""
self._step_type = step_type self._step_type = step_type
assert self.name is not None, "Protocol name missing" assert self.name is not None, "Protocol name missing"


Expand All @@ -101,6 +95,9 @@ def write(self, key, value):
raise NotImplementedError raise NotImplementedError


class TabSplitProtocol(HadoopStreamingProtocol): class TabSplitProtocol(HadoopStreamingProtocol):
"""Abstract base class for all protocol that splits keys and values with '\t' characters.
Inherit from it and define your own :py:meth:`load_from_string` and :py:meth:`dump_to_string` functions.
"""
name = '__tab_split__' name = '__tab_split__'


def __init__(self, step_type=None): def __init__(self, step_type=None):
Expand Down Expand Up @@ -134,12 +131,27 @@ def write(self, object_key, object_value):
return '%s\t%s' % (raw_key, raw_value) return '%s\t%s' % (raw_key, raw_value)


def load_from_string(self, string_to_read): def load_from_string(self, string_to_read):
"""Deserialize an object from a string.
:type string_to_read: str
:param string_to_read: A string to deserialize into a Python object.
:return: A python object of `decoded_item`."""
raise NotImplementedError raise NotImplementedError


def dump_to_string(self, object_to_dump): def dump_to_string(self, object_to_dump):
"""Serialize an object to a string.
:type object_to_dump: object
:param object_to_dump: An object to serialize to a string.
:return: A string representing the serialized form of object_to_dump."""
raise NotImplementedError raise NotImplementedError


class ValueOnlyProtocol(HadoopStreamingProtocol): class ValueOnlyProtocol(HadoopStreamingProtocol):
"""Abstract base class for all protocol that reads/writes lines only as values.
Inherit from it and define your own :py:meth:`load_from_string` and :py:meth:`dump_to_string` functions.
"""
name = '__value_only__' name = '__value_only__'


def read(self, line): def read(self, line):
Expand All @@ -149,9 +161,21 @@ def write(self, object_key, object_value):
return self.dump_to_string(object_value) return self.dump_to_string(object_value)


def load_from_string(self, string_to_read): def load_from_string(self, string_to_read):
"""Deserialize an object from a string.
:type string_to_read: str
:param string_to_read: A string to deserialize into a Python object.
:return: A python object of `decoded_item`."""
raise NotImplementedError raise NotImplementedError


def dump_to_string(self, object_item): def dump_to_string(self, object_item):
"""Serialize an object to a string.
:type object_to_dump: object
:param object_to_dump: An object to serialize to a string.
:return: A string representing the serialized form of object_to_dump."""
raise NotImplementedError raise NotImplementedError


class JSONMixin(object): class JSONMixin(object):
Expand Down
8 changes: 0 additions & 8 deletions mrjob/util.py
Expand Up @@ -307,11 +307,3 @@ def unarchive(archive_path, dest):
dest_file.write(archive.read(name)) dest_file.write(archive.read(name))
else: else:
raise IOError('Unknown archive type: %s' % (archive_path,)) raise IOError('Unknown archive type: %s' % (archive_path,))

# http://groups.google.com/group/comp.lang.python/browse_thread/thread/4dce045a99f29825/328ca175947199da
class classproperty(object):
def __init__(self, getter):
self._getter = getter

def __get__(self, instance, owner):
return self._getter(owner)

0 comments on commit 8a28e81

Please sign in to comment.