Skip to content

Commit

Permalink
RDD.repartitionAndSortWithinPartitions() (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
svenkreiss committed May 2, 2017
1 parent d27dd99 commit dbd4006
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
52 changes: 49 additions & 3 deletions pysparkling/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,17 @@ def partitionBy(self, numPartitions, partitionFunc=None):
:param int numPartitions: Number of partitions.
:param function partitionFunc: Partition function.
:rtype: RDD
Example where even numbers get assigned to partition 0
and odd numbers to partition 1:
>>> import pysparkling
>>> sc = pysparkling.Context()
>>> rdd = sc.parallelize([1, 3, 2, 7, 8, 5], 1)
>>> keyvalue_rdd = rdd.map(lambda x: (x, x))
>>> keyvalue_rdd.partitionBy(2).keys().collect()
[2, 8, 1, 3, 7, 5]
"""

if partitionFunc is None:
Expand Down Expand Up @@ -1057,6 +1068,40 @@ def repartition(self, numPartitions):
"""
return self.context.parallelize(self.toLocalIterator(), numPartitions)

def repartitionAndSortWithinPartitions(
self, numPartitions=None, partitionFunc=None,
ascending=True, keyfunc=None,
):
"""Repartition and sort within each partition.
:param int numPartitions: Number of partitions in new RDD.
:param partitionFunc: function that partitions
:param ascending: Default is True.
:param keyfunc: Returns the value that will be sorted.
:rtype: RDD
Example where even numbers are assigned to partition 0 and odd numbers
to partition 1 and then the partitions are sorted individually:
>>> import pysparkling
>>> sc = pysparkling.Context()
>>> rdd = sc.parallelize([1, 3, 2, 7, 8, 5], 1)
>>> kv_rdd = rdd.map(lambda x: (x, x))
>>> processed = kv_rdd.repartitionAndSortWithinPartitions(2)
>>> processed.keys().collect()
[2, 8, 1, 3, 5, 7]
"""

def partition_sort(data):
return sorted(data, key=keyfunc, reverse=not ascending)

return (
self
.partitionBy(numPartitions, partitionFunc)
.mapPartitions(partition_sort)
)

def rightOuterJoin(self, other, numPartitions=None):
"""right outer join
Expand All @@ -1070,9 +1115,10 @@ def rightOuterJoin(self, other, numPartitions=None):
Example:
>>> from pysparkling import Context
>>> rdd1 = Context().parallelize([(0, 1), (1, 1)])
>>> rdd2 = Context().parallelize([(2, 1), (1, 3)])
>>> import pysparkling
>>> sc = pysparkling.Context()
>>> rdd1 = sc.parallelize([(0, 1), (1, 1)])
>>> rdd2 = sc.parallelize([(2, 1), (1, 3)])
>>> sorted(rdd1.rightOuterJoin(rdd2).collect())
[(1, (1, 3)), (2, (None, 1))]
"""
Expand Down
9 changes: 8 additions & 1 deletion scripts/pyspark_comparisons.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,17 @@ def stat():
print(str(s2))


def partition_by():
rdd = SC.parallelize(range(20), 2).map(lambda x: (x, x))
r = rdd.partitionBy(2).collect()
print('>>>>>>', r)


if __name__ == '__main__':
# simple_textFile()
# lazy_execution()
# count_lines()
# create_key_value_txt()
create_pickled_files()
# create_pickled_files()
# stat()
partition_by()

0 comments on commit dbd4006

Please sign in to comment.