Skip to content

Commit

Permalink
[SPARK-1308] Add getNumPartitions to pyspark RDD
Browse files Browse the repository at this point in the history
Add getNumPartitions to pyspark RDD to provide an intuitive way to get number of partitions in RDD like we can do in scala today.

Author: Syed Hashmi <shashmi@cloudera.com>

Closes apache#995 from syedhashmi/master and squashes the following commits:

de0ed5e [Syed Hashmi] [SPARK-1308] Add getNumPartitions to pyspark RDD
  • Loading branch information
Syed Hashmi authored and rxin committed Jun 9, 2014
1 parent 32ee9f0 commit 6113ac1
Showing 1 changed file with 27 additions and 18 deletions.
45 changes: 27 additions & 18 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def getCheckpointFile(self):
def map(self, f, preservesPartitioning=False):
"""
Return a new RDD by applying a function to each element of this RDD.
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
[('a', 1), ('b', 1), ('c', 1)]
Expand Down Expand Up @@ -312,6 +312,15 @@ def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
"use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
return self.mapPartitionsWithIndex(f, preservesPartitioning)

def getNumPartitions(self):
"""
Returns the number of partitions in RDD
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> rdd.getNumPartitions()
2
"""
return self._jrdd.splits().size()

def filter(self, f):
"""
Return a new RDD containing only the elements that satisfy a predicate.
Expand Down Expand Up @@ -413,9 +422,9 @@ def union(self, other):

def intersection(self, other):
"""
Return the intersection of this RDD and another one. The output will not
Return the intersection of this RDD and another one. The output will not
contain any duplicate elements, even if the input RDDs did.
Note that this method performs a shuffle internally.
>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
Expand Down Expand Up @@ -571,14 +580,14 @@ def foreachPartition(self, f):
"""
Applies a function to each partition of this RDD.
>>> def f(iterator):
... for x in iterator:
... print x
>>> def f(iterator):
... for x in iterator:
... print x
... yield None
>>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
"""
self.mapPartitions(f).collect() # Force evaluation

def collect(self):
"""
Return a list that contains all of the elements in this RDD.
Expand Down Expand Up @@ -673,7 +682,7 @@ def func(iterator):
yield acc

return self.mapPartitions(func).fold(zeroValue, combOp)


def max(self):
"""
Expand All @@ -692,7 +701,7 @@ def min(self):
1.0
"""
return self.reduce(min)

def sum(self):
"""
Add up the elements in this RDD.
Expand Down Expand Up @@ -786,7 +795,7 @@ def mergeMaps(m1, m2):
m1[k] += v
return m1
return self.mapPartitions(countPartition).reduce(mergeMaps)

def top(self, num):
"""
Get the top N elements from a RDD.
Expand Down Expand Up @@ -814,7 +823,7 @@ def merge(a, b):
def takeOrdered(self, num, key=None):
"""
Get the N elements from a RDD ordered in ascending order or as specified
by the optional key function.
by the optional key function.
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
[1, 2, 3, 4, 5, 6]
Expand All @@ -834,7 +843,7 @@ def unKey(x, key_=None):
if key_ != None:
x = [i[1] for i in x]
return x

def merge(a, b):
return next(topNKeyedElems(a + b))
result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge)
Expand Down Expand Up @@ -1169,21 +1178,21 @@ def _mergeCombiners(iterator):
combiners[k] = mergeCombiners(combiners[k], v)
return combiners.iteritems()
return shuffled.mapPartitions(_mergeCombiners)

def foldByKey(self, zeroValue, func, numPartitions=None):
"""
Merge the values for each key using an associative function "func" and a neutral "zeroValue"
which may be added to the result an arbitrary number of times, and must not change
the result (e.g., 0 for addition, or 1 for multiplication.).
which may be added to the result an arbitrary number of times, and must not change
the result (e.g., 0 for addition, or 1 for multiplication.).
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> from operator import add
>>> rdd.foldByKey(0, add).collect()
[('a', 2), ('b', 1)]
"""
return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)


# TODO: support variant with custom partitioner
def groupByKey(self, numPartitions=None):
"""
Expand Down Expand Up @@ -1302,7 +1311,7 @@ def keyBy(self, f):
def repartition(self, numPartitions):
"""
Return a new RDD that has exactly numPartitions partitions.
Can increase or decrease the level of parallelism in this RDD. Internally, this uses
a shuffle to redistribute data.
If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
Expand Down

0 comments on commit 6113ac1

Please sign in to comment.