From c018887b354f4cb65154654f9ea77833a6d59aa5 Mon Sep 17 00:00:00 2001 From: Syed Hashmi Date: Mon, 9 Jun 2014 00:08:40 -0700 Subject: [PATCH] [SPARK-1308] Add getNumPartitions to pyspark RDD Add getNumPartitions to pyspark RDD to provide an intuitive way to get number of partitions in RDD like we can do in scala today. Author: Syed Hashmi Closes #995 from syedhashmi/master and squashes the following commits: de0ed5e [Syed Hashmi] [SPARK-1308] Add getNumPartitions to pyspark RDD --- python/pyspark/rdd.py | 45 ++++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index ca0a95578fd28..9c69c79236edc 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -250,7 +250,7 @@ def getCheckpointFile(self): def map(self, f, preservesPartitioning=False): """ Return a new RDD by applying a function to each element of this RDD. - + >>> rdd = sc.parallelize(["b", "a", "c"]) >>> sorted(rdd.map(lambda x: (x, 1)).collect()) [('a', 1), ('b', 1), ('c', 1)] @@ -312,6 +312,15 @@ def mapPartitionsWithSplit(self, f, preservesPartitioning=False): "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) return self.mapPartitionsWithIndex(f, preservesPartitioning) + def getNumPartitions(self): + """ + Returns the number of partitions in RDD + >>> rdd = sc.parallelize([1, 2, 3, 4], 2) + >>> rdd.getNumPartitions() + 2 + """ + return self._jrdd.splits().size() + def filter(self, f): """ Return a new RDD containing only the elements that satisfy a predicate. @@ -413,9 +422,9 @@ def union(self, other): def intersection(self, other): """ - Return the intersection of this RDD and another one. The output will not + Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did. - + Note that this method performs a shuffle internally. >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) @@ -571,14 +580,14 @@ def foreachPartition(self, f): """ Applies a function to each partition of this RDD. - >>> def f(iterator): - ... for x in iterator: - ... print x + >>> def f(iterator): + ... for x in iterator: + ... print x ... yield None >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) """ self.mapPartitions(f).collect() # Force evaluation - + def collect(self): """ Return a list that contains all of the elements in this RDD. @@ -673,7 +682,7 @@ def func(iterator): yield acc return self.mapPartitions(func).fold(zeroValue, combOp) - + def max(self): """ @@ -692,7 +701,7 @@ def min(self): 1.0 """ return self.reduce(min) - + def sum(self): """ Add up the elements in this RDD. @@ -786,7 +795,7 @@ def mergeMaps(m1, m2): m1[k] += v return m1 return self.mapPartitions(countPartition).reduce(mergeMaps) - + def top(self, num): """ Get the top N elements from a RDD. @@ -814,7 +823,7 @@ def merge(a, b): def takeOrdered(self, num, key=None): """ Get the N elements from a RDD ordered in ascending order or as specified - by the optional key function. + by the optional key function. >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) [1, 2, 3, 4, 5, 6] @@ -834,7 +843,7 @@ def unKey(x, key_=None): if key_ != None: x = [i[1] for i in x] return x - + def merge(a, b): return next(topNKeyedElems(a + b)) result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge) @@ -1169,12 +1178,12 @@ def _mergeCombiners(iterator): combiners[k] = mergeCombiners(combiners[k], v) return combiners.iteritems() return shuffled.mapPartitions(_mergeCombiners) - + def foldByKey(self, zeroValue, func, numPartitions=None): """ Merge the values for each key using an associative function "func" and a neutral "zeroValue" - which may be added to the result an arbitrary number of times, and must not change - the result (e.g., 0 for addition, or 1 for multiplication.). + which may be added to the result an arbitrary number of times, and must not change + the result (e.g., 0 for addition, or 1 for multiplication.). >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> from operator import add @@ -1182,8 +1191,8 @@ def foldByKey(self, zeroValue, func, numPartitions=None): [('a', 2), ('b', 1)] """ return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions) - - + + # TODO: support variant with custom partitioner def groupByKey(self, numPartitions=None): """ @@ -1302,7 +1311,7 @@ def keyBy(self, f): def repartition(self, numPartitions): """ Return a new RDD that has exactly numPartitions partitions. - + Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using `coalesce`,