Skip to content

Commit

Permalink
Merge pull request #33 from svenkreiss/avoid-tee
Browse files Browse the repository at this point in the history
avoid itertools.tee but still lazily load partition data
  • Loading branch information
svenkreiss committed Feb 27, 2016
2 parents 7e37571 + 577fab4 commit bd21840
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 9 deletions.
8 changes: 4 additions & 4 deletions pysparkling/partition.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import absolute_import

import logging
import itertools

log = logging.getLogger(__name__)

Expand All @@ -12,14 +11,15 @@ def __init__(self, x, idx=None):
self._x = x

def x(self):
self._x, r = itertools.tee(self._x)
return r
if not isinstance(self._x, list):
self._x = list(self._x)
return self._x

def hashCode(self):
return self.index

def __getstate__(self):
return {
'index': self.index,
'_x': list(self.x())
'_x': self.x(),
}
9 changes: 4 additions & 5 deletions pysparkling/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class RDD(object):
"""

def __init__(self, partitions, ctx):
self._p = partitions
self._p = list(partitions)
self.context = ctx
self._name = None
self._rdd_id = ctx.newRddId()
Expand All @@ -64,8 +64,7 @@ def compute(self, split, task_context):
return split.x()

def partitions(self):
self._p, r = itertools.tee(self._p, 2)
return r
return self._p

"""
Expand Down Expand Up @@ -644,7 +643,7 @@ def getNumPartitions(self):
Returns the number of partitions.
"""
return sum(1 for _ in self.partitions())
return len(self.partitions())

def getPartitions(self):
"""
Expand Down Expand Up @@ -1629,7 +1628,7 @@ def takeSample(self, n):
"""

rnd_entries = sorted([random.random() for _ in range(n)])
num_partitions = sum(1 for _ in self.partitions())
num_partitions = self.getNumPartitions()

rnd_entries = [
(
Expand Down

0 comments on commit bd21840

Please sign in to comment.