Skip to content

Commit

Permalink
pickable namedtuple
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Jul 28, 2014
1 parent a7d145e commit 93b03b8
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
29 changes: 29 additions & 0 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import marshal
import struct
import sys
import collections

from pyspark import cloudpickle


Expand Down Expand Up @@ -267,6 +269,33 @@ def dumps(self, obj):
return obj


# Hook namedtuple, make it picklable
# pyspark should be imported before 'from collections import namedtuple'

old_namedtuple = collections.namedtuple
__cls = {}

def _restore(name, fields, value):
k = (name, fields)
cls = __cls.get(k)
if cls is None:
cls = namedtuple(name, fields)
__cls[k] = cls
return cls(*value)

def namedtuple(name, fields, verbose=False, rename=False):
""" Pickable namedtuple """
cls = old_namedtuple(name, fields, verbose, rename)

def __reduce__(self):
return (_restore, (name, fields, tuple(self)))

cls.__reduce__ = __reduce__
return cls

collections.namedtuple = namedtuple


class PickleSerializer(FramedSerializer):
"""
Serializes objects using Python's cPickle serializer:
Expand Down
8 changes: 8 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,14 @@ def combOp(x, y):
self.assertEqual(set([2]), sets[3])
self.assertEqual(set([1, 3]), sets[5])

def test_namedtuple_in_rdd(self):
from collections import namedtuple
Person = namedtuple("Person", "id firstName lastName")
jon = Person(1, "Jon", "Doe")
jane = Person(2, "Jane", "Doe")
theDoes = self.sc.parallelize([jon, jane])
self.assertEquals([jon, jane], theDoes.collect())


class TestIO(PySparkTestCase):

Expand Down

0 comments on commit 93b03b8

Please sign in to comment.