Skip to content

Commit

Permalink
Merge pull request #48 from svenkreiss/group-by-key-performance
Browse files Browse the repository at this point in the history
rewrite groupByKey()
  • Loading branch information
svenkreiss committed Jun 18, 2016
2 parents 6fff30f + ccf4da8 commit ed358f2
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions pysparkling/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,12 +677,11 @@ def groupByKey(self, numPartitions=None):
if numPartitions is None:
numPartitions = self.getNumPartitions()

return self.context.parallelize((
(k, [gg[1] for gg in g]) for k, g in itertools.groupby(
sorted(self.collect(), key=itemgetter(0)),
lambda e: e[0],
)
), numPartitions)
r = defaultdict(list)
for key, value in self.toLocalIterator():
r[key].append(value)

return self.context.parallelize(r.items(), numPartitions)

def histogram(self, buckets):
"""histogram
Expand Down

0 comments on commit ed358f2

Please sign in to comment.