Skip to content

Commit

Permalink
Merge pull request #66 from svenkreiss/multiprocessing-zipwithindex
Browse files Browse the repository at this point in the history
fix zipWithIndex() in multiprocessing
  • Loading branch information
svenkreiss committed Jun 18, 2017
2 parents c9fb7c7 + 84fa250 commit 31cdfa0
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
4 changes: 1 addition & 3 deletions pysparkling/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@
class Partition(object):
def __init__(self, x, idx=None):
self.index = idx
self._x = x
self._x = list(x)

def x(self):
if not isinstance(self._x, list):
self._x = list(self._x)
return self._x

def hashCode(self):
Expand Down
12 changes: 12 additions & 0 deletions tests/test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ def test_basic(self):
r = self.sc.parallelize([1, 3, 4]).map(math.sqrt).collect()
self.assertIn(2, r)

def test_zipWithIndex(self):
"""Prevent regression in zipWithIndex().
Test the case of parallelizing data directly form toLocalIterator()
in the multiprocessing case.
"""
r = (self.sc
.parallelize([1, 3, 4, 9, 15, 25, 50, 75, 100], 3)
.zipWithIndex()
.collect())
self.assertIn((4, 2), r)

def test_cache(self):
r = self.sc.parallelize(range(3), 3)
r = r.map(lambda _: time.sleep(0.1)).cache()
Expand Down

0 comments on commit 31cdfa0

Please sign in to comment.