<strong>map</strong>(f, preservesPartitioning=False)<br/>
<strong>mapPartitions</strong>(f, preservesPartitioning=False)<br/>
<ul>
<li><strong>map</strong> will not change the number of elements in an RDD, while <strong>mapPartitions</strong> might very well do so.</li>
<li>The method <a href="https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=rdd#pyspark.RDD.map">map</a> Return a new distributed dataset formed by passing each <em>element</em> of the source through a function func. <a href="https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=rdd#pyspark.RDD.mapPartitions">mapPartitions</a> Similar to map, but runs separately on each <em>partition(block)</em> of the RDD, so <i>func</i> must be of type Iterator&lt;T&gt; =&gt; Iterator&lt;U&gt; when running on an RDD of type T.</li>
<li><strong>map</strong> works the function being utilized at a per element level while  <strong>mapPartitions</strong> exercises the function at the partition level</li>
</ul>
<p><strong><em>Example Scenario</em></strong>: if we have 100K elements in a particular RDD partition then we will fire off the function being used by the mapping transformation 100K times when we use <strong>map</strong>.Conversely, if we use <strong>mapPartitions</strong> then we will only call the particular function one time, but we will pass in all 100K records and get back all responses in one function call.There will be performance gain since <strong>map</strong> works on a particular function so many times, especially if the function is doing something expensive each time that it wouldn't need to do if we passed in all the elements at once(in case of <strong>mapPartitions</strong>).</p>

In [1]:
rdd = sc.parallelize([1,2,3,4,5],2)
rdd.map(lambda a:a if a!=2 else None).collect()

[1, None, 3, 4, 5]

In [2]:
def filter_out_2(partition):
    print("partition:")
    for element in partition:
        print(element)
        if element != 2:
            yield element
rdd.mapPartitions(filter_out_2).collect()

[1, 3, 4, 5]

<i>shell打印输出:</i><br/>partition:
3
4
5
partition:
1
2

<strong>mapValues</strong>(f)<br/>
&emsp;&emsp;Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD’s partitioning.

In [3]:
rdd = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
def f(x): return len(x)
rdd.mapValues(f).collect()

[('a', 3), ('b', 1)]

<strong>groupByKey</strong>(numPartitions=None, partitionFunc)<br/>
&emsp;&emsp;When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable&lt;V&gt;) pairs. 
<p><b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using <em>reduceByKey</em> or <em>aggregateByKey</em> will yield much better performance.
<br/>
<b>Note:</b> By default, the level of parallelism in the output depends on the number of partitions of the parent RDD.You can pass an optional <em>numTasks</em> argument to set a different number of tasks.</p>

In [4]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.groupByKey().mapValues(len).collect()

[('b', 1), ('a', 2)]

In [5]:
rdd.groupByKey().mapValues(list).collect()

[('b', [1]), ('a', [1, 1])]

if you want to use <b>reduceByKey</b> it will be:

In [6]:
from operator import add
rdd.reduceByKey(add).collect()

[('b', 1), ('a', 2)]

<strong>reduceByKey</strong>(func, numPartitions=None, partitionFunc)<br/>
&emsp;&emsp;When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function <i>func</i>, which must be of type (V,V) =&gt; V. Like in <em>groupByKey</em>, the number of reduce tasks is configurable through an optional second argument.

In [7]:
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.reduceByKey(add).collect()

[('b', 1), ('a', 2)]

<strong>aggregate</strong>(zeroValue, seqOp, combOp)<br/>
&emsp;&emsp;Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral “zero value.”<br/>
&emsp;&emsp;The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.<br/>
&emsp;&emsp;The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U.
<p>&emsp;&emsp;文档中给出了一个Example如下：</p>

In [8]:
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)

(10, 4)

为便于理解，代码修改如下:

In [9]:
def seqOp(x,y):
    if x ==(0,0):
        print("partition-----")
    print("seq_x:",x)
    print("seq_y:",y)
    z = (x[0] + y,x[1] + 1)
    print("seq_z:",z)
    return z
def combOp(x,y):
    print("comb_x",x)
    print("comb_y",y)
    z = (x[0] + y[0],x[1] + y[1])
    print("comb_z:",z)
    return z

In [10]:
sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)

comb_x (0, 0)
comb_y (3, 2)
comb_z: (3, 2)
comb_x (3, 2)
comb_y (7, 2)
comb_z: (10, 4)


(10, 4)

<i>shell打印输出:</i><br/>
partition-----<br/>
seq_x: (0, 0)<br/>
seq_y: 3<br/>
seq_z: (3, 1)<br/>
seq_x: (3, 1)<br/>
seq_y: 4<br/>
seq_z: (7, 2)<br/>
partition-----<br/>
seq_x: (0, 0)<br/>
seq_y: 1<br/>
seq_z: (1, 1)<br/>
seq_x: (1, 1)<br/>
seq_y: 2<br/>
seq_z: (3, 2)<br/>

<br>&emsp;&emsp;从上面的代码的输出结果可以看出，1,2被分到第1个分区中，3,4被分到第2个分区中。在第1个分区中首先将zeroValue(0,0)和第一个元素3传给seqOp函数，返回(3,1)，然后将(3,1)和第二个元素4传给seqOp函数，返回(7,2)，以此类推，在第2个分区中还是先将zeroValue(0,0)和第一个元素1传给seqOp函数，返回(1,1)，然后将(1,1)和第二个元素1传给seqOp函数，返回(3,2)。最后将初始值zeroValue(0,0)和两个分区的结果经过combOp函数进行计算，先将初始值zeroValue(0,0)和第二个分区的结果(3，2)传给combine函数，返回(3，2)，然后将(3，2)和第一个分区结果(7,2)传给combine函数，返回最终结果(10, 4)。</p><p>&emsp;&emsp;因此对于<strong>aggregate</strong>总结如下：<br/>
&emsp;&emsp;def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U<br/>
&emsp;&emsp;aggregate用户聚合RDD中的元素，先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型，再使用combOp将之前每个分区聚合后的U类型聚合成U类型，特别注意seqOp和combOp都会使用zeroValue的值，zeroValue的类型为U。<br/>&emsp;&emsp;将初始值和第一个分区中的第一个元素传递给seq函数进行计算，然后将计算结果和第二个元素传递给seq函数，直到计算到最后一个值。第二个分区中也是同理操作。最后将所有分区的结果经过combine函数进行计算（先将前两个结果进行计算，将返回结果和下一个结果传给combine函数，以此类推），并返回最终结果。</p>
<p>&emsp;&emsp;再举个例子如下：</p>

In [11]:
rdd = sc.parallelize((1,2,3,4,5,6),2)
def seq(a,b):
    print ('seqOp:'+str(a)+"\t"+str(b))
    return min(a,b)
def comb(a,b):
    print ('comOp'+str(a)+"\t"+str(b))
    return a+b
rdd.aggregate(3,seq,comb)

comOp3	1
comOp4	3


7

shell打印输出:<br/>
seqOp:3	1<br/>
seqOp:1	2<br/>
seqOp:1	3<br/>
seqOp:3	4<br/>
seqOp:3	5<br/>
seqOp:3	6<br/>

<strong>aggregateByKey</strong>(zeroValue, seqFunc, combFunc, numPartitions, partitionFunc)<br/>
&emsp;&emsp;Aggregate the values of each key, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U’s, The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.<br/>&emsp;&emsp;该函数是对PairRDD中相同Key的值进行聚合操作，在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似，aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作，所以aggregateByKey函数最终返回的类型还是PairRDD，对应的结果是Key和聚合好的值；而aggregate函数直接是返回非RDD的结果，这点需要注意。在实现过程中，定义了三个aggregateByKey函数原型，但最终调用的aggregateByKey函数都一致。
<ol>
<li>def aggregateByKey[U:ClassTag]\(zeroValue:U, partitioner:Partitioner\)<br/>(seqOp:(U, V) => U, combOp:(U, U) => U):RDD[(K, U)]</li>
<li>def aggregateByKey[U:ClassTag]\(zeroValue:U, numPartitions:Int\)<br/>(seqOp:(U, V) => U, combOp:(U, U) => U):RDD[(K, U)]</li>
<li>def aggregateByKey[U:ClassTag]\(zeroValue:U\)<br/>(seqOp:(U, V) => U, combOp:(U, U) => U):RDD[(K, U)]</li>
</ol>
<br/>&emsp;&emsp;第一个aggregateByKey函数我们可以自定义Partitioner。除了这个参数之外，其函数声明和aggregate很类似；其他的aggregateByKey函数实现最终都是调用这个。<br/>
&emsp;&emsp;第二个aggregateByKey函数可以设置分区的个数(numPartitions)，最终用的是HashPartitioner。<br/>
&emsp;&emsp;第三个aggregateByKey实现先会判断当前RDD是否定义了分区函数，如果定义了则用当前RDD的分区；如果当前RDD并未定义分区 ，则使用HashPartitioner。<br/><br/>
&emsp;&emsp;这个函数中:<br/>
&emsp;&emsp;U: ClassTag==>表示这个最终的RDD的返回值类型.<br/>
&emsp;&emsp;zeroValue: U==>表示在每个分区中第一次拿到key值时,用于创建一个返回类型的函数,这个函数最终会被包装成先生成一个返回类型,然后通过调用seqOp函数,把第一个key对应的value添加到这个类型U的变量中,下面代码的红色部分.<br/>
&emsp;&emsp;seqOp: (U,V) => U ==> 这个用于把迭代分区中key对应的值添加到zeroValue创建的U类型实例中.<br/>
&emsp;&emsp;combOp: (U,U) => U ==> 这个用于合并每个分区中聚合过来的两个U类型的。</p>

In [12]:
data = sc.parallelize([(1,3),(1,2),(1,4),(2,3)])
def seq(a,b):
    if a == 1:
        print("partition:")
    print("seq_a:",a)
    print("seq_b:",b)
    return max(a,b)
def comb(a,b):
    print("comb_a:",a)
    print("comb_b:",b)
    return a+b
data.aggregateByKey(1,seq,comb).collect()

[(2, 3), (1, 7)]

<em>shell output:</em><br/>
partition:<br/>
seq_a: 1<br/>
seq_b: 3<br/>
seq_a: 3<br/>
seq_b: 2<br/>
partition:<br/>
seq_a: 1<br/>
seq_b: 4<br/>
partition:<br/>
seq_a: 1<br/>
seq_b: 3<br/>
comb_a: 3<br/>
comb_b: 4<br/>

<strong>foreach</strong>(f)<br/>
&emsp;&emsp;Applies a function to each partition of this RDD.

In [18]:
def f(x): print(x)
sc.parallelize([1, 2, 3, 4, 5]).foreach(f)

<em>shell output：</em><br/>
1<br/>
2<br/>
3<br/>
4<br/>
5<br/>

<strong>groupBy</strong>(f, numPartitions, partitionFunc)<br/>
&emsp;&emsp;Return an RDD of grouped items.

In [20]:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd.groupBy(lambda x: x % 2).collect()

In [21]:
result

[(0, <pyspark.resultiterable.ResultIterable at 0x7f8fec386400>),
 (1, <pyspark.resultiterable.ResultIterable at 0x7f8fec3867f0>)]

In [22]:
[(x, sorted(y)) for (x, y) in result]

[(0, [2, 8]), (1, [1, 1, 3, 5])]