# RDD算子
初始化spark

In [1]:
%%init_spark
launcher.master = "local[6]"

In [2]:
spark

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.216.1:4040
SparkContext available as 'sc' (version = 2.1.1, master = local[6], app id = local-1559487109383)
SparkSession available as 'spark'


res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@561d9114


In [3]:
spark

res1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@561d9114


In [4]:
sc

res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@5ccc3144


## 创建RDD
### sc.parallelize
```scala
  def parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T]
```

### sc.makeRDD
```scala
  /** Distribute a local Scala collection to form an RDD.
   *
   * This method is identical to `parallelize`.
   */
  def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
  }
```
### textFile
```scala
  def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String]
```

In [5]:
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25


In [6]:
//查看该rdd的分区数量，默认是程序所分配的cpu core的数量，也可以在创建的时候指定
 rdd1.partitions.length

res3: Int = 6


In [7]:
val rdd1 = sc.parallelize(Array(1,2,3.4),3)
rdd1.partitions.length

rdd1: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[1] at parallelize at <console>:27
res4: Int = 3


## Transformation
RDD中的所有转换都是延迟加载的，也就是说，它们并不会直接计算结果。相反的，它们只是记住这些应用到基础数据集（例如一个文件）上的转换动作。只有当发生一个要求返回结果给Driver的动作时，这些转换才会真正运行。这种设计让Spark更加有效率地运行。

### map(func)
```scala
def map[U: ClassTag](f: T => U): RDD[U]
```
返回一个新的RDD，该RDD由每一个输入的元素经过func函数转换后组成。

In [8]:
val rdd = sc.parallelize(1 to 10)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:25


In [9]:
rdd.collect()

res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


In [10]:
val mapRdd = rdd.map(_ * 2)

mapRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:27


In [11]:
mapRdd.collect

res6: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)


### filter(func)
```scala
def filter(f: T => Boolean): RDD[T]
```
返回一个新的RDD，该RDD由每一个输入的元素经过func函数计算后返回为true的输入元素组成。

In [12]:
val rdd = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:25


In [13]:
val filterRdd = rdd.filter(_.contains("xiao"))

filterRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at filter at <console>:27


In [14]:
filterRdd.collect

res7: Array[String] = Array(xiaoming, xiaojiang, xiaohe)


### flatMap(func)
```scala
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
```
将函数应用于 RDD 中的每个元素，将返回的迭代器的所有内容构成新的 RDD。通常用来切分单词。类似于先map，然后再flatten。

In [15]:
val rdd = sc.parallelize(1 to 5)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:25


In [16]:
val  flatMapRdd = rdd.flatMap(1 to _)

flatMapRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at flatMap at <console>:27


In [17]:
flatMapRdd.collect

res8: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)


### distinct([numTasks]))
```scala
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

  def distinct(): RDD[T] = withScope {
    distinct(partitions.length)
  }
```
去重。对源RDD进行去重后返回一个新的RDD. 默认情况下，只有n个并行任务来操作，但是可以传入一个可选的numTasks参数改变它。

In [18]:
val rdd = sc.parallelize(List(1,2,1,5,2,9,6,1))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:25


In [19]:
val distinctRdd  = rdd.distinct

distinctRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[11] at distinct at <console>:27


In [20]:
distinctRdd.collect

res9: Array[Int] = Array(6, 1, 2, 9, 5)


In [21]:
rdd.distinct(2).collect

res10: Array[Int] = Array(6, 2, 1, 9, 5)


可以看到这个函数的顺序和原来的顺序不一样

### union(otherRdd)
```scala
def union(other: RDD[T]): RDD[T]
```
求并集，注意类型要一致，生成一个包含两个 RDD 中所有元素的 RDD。**相同的元素会出现多次**。

In [22]:
val rdd1 = sc.parallelize(1 to 5)
val rdd2 = sc.parallelize(3 to 7)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:25
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:26


In [23]:
rdd1.union(rdd2).collect

res11: Array[Int] = Array(1, 2, 3, 4, 5, 3, 4, 5, 6, 7)


### intersection(otherRdd)
```scala
def intersection(other: RDD[T]): RDD[T]

def intersection(other: RDD[T],partitioner: Partitioner(implicit ord: Ordering[T] = null): RDD[T]

def intersection(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
  intersection(other, new HashPartitioner(numPartitions))
}                
```
求交集，求两个 RDD 共同的元素的 RDD。**输出将不包含任何重复项**。

In [24]:
rdd1.intersection(rdd2).collect

res12: Array[Int] = Array(3, 4, 5)


In [25]:
rdd1.union(rdd2).intersection(rdd1).collect

res13: Array[Int] = Array(1, 2, 3, 4, 5)


### subtract(otherRdd)
```scala
def subtract(other: RDD[T]): RDD[T]               
```
移除一个 RDD 中的内容（例如移除训练数据）。**输出可以包含任何重复项**。

In [26]:
val subRdd = sc.makeRDD(List(1,1,2,3,3,8)).subtract(sc.makeRDD(List(3,4,5)))

subRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[36] at subtract at <console>:25


In [27]:
subRdd.collect

res14: Array[Int] = Array(1, 1, 2, 8)


### cartesian(otherRdd)
```scala
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]           
```
笛卡尔积。

In [28]:
val cartesianRdd = sc.makeRDD(List(1,1,2,3,3)).cartesian(sc.makeRDD(List("a","b","c")))

cartesianRdd: org.apache.spark.rdd.RDD[(Int, String)] = CartesianRDD[39] at cartesian at <console>:25


In [29]:
cartesianRdd.collect

res15: Array[(Int, String)] = Array((1,a), (1,b), (1,c), (1,a), (1,b), (1,c), (2,a), (2,b), (2,c), (3,a), (3,b), (3,c), (3,a), (3,b), (3,c))


In [30]:
cartesianRdd.collect.foreach{case (a,b) => println(a,b)}

(1,a)
(1,b)
(1,c)
(1,a)
(1,b)
(1,c)
(2,a)
(2,b)
(2,c)
(3,a)
(3,b)
(3,c)
(3,a)
(3,b)
(3,c)


In [31]:
val cartesianRdd = sc.makeRDD(List(1,1,2,3,3)).cartesian(sc.makeRDD(List(21,22,23)))

cartesianRdd: org.apache.spark.rdd.RDD[(Int, Int)] = CartesianRDD[42] at cartesian at <console>:25


In [32]:
cartesianRdd.collect

res17: Array[(Int, Int)] = Array((1,21), (1,22), (1,23), (1,21), (1,22), (1,23), (2,21), (2,22), (2,23), (3,21), (3,22), (3,23), (3,21), (3,22), (3,23))


### sortBy（func，[ascending], [numTasks]）
```scala
  //Return this RDD sorted by the given key function.
  def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]          
```
返回一个新的RDD，排序按照元素经过func函数计算后返回的key比较。（默认方式为false，升序；true是降序）

In [33]:
val rdd = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[43] at parallelize at <console>:25


In [34]:
rdd.sortBy(x => x).collect

res18: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


In [35]:
rdd.sortBy(x => x.toString).collect

res19: Array[Int] = Array(1, 10, 2, 3, 4, 5, 6, 7, 8, 9)


In [36]:
rdd.sortBy(x => x,false).collect

res20: Array[Int] = Array(10, 9, 8, 7, 6, 5, 4, 3, 2, 1)


### mapPartitions(func)
```scala
  //Return a new RDD by applying a function to each partition of this RDD.
  def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U]      
```
针对每个分区进行操作，func要求传入一个Iterator，并且返回一个Iterator。使用 mapPartitions 函数获得输入 RDD 的每个分区中的元素迭代器，而需要返回的是执行结果的序列的迭代器。

类似于map，但独立地在RDD的每一个分片上运行，因此在类型为T的RDD上运行时，func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素，有M个分区，那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区

preservesPartitioning表示返回RDD是否留有分区器。仅当RDD为K-V型RDD，且key没有被修饰的情况下，可设为true。非K-V型RDD一般不存在分区器；K-V RDD key被修改后，元素将不再满足分区器的分区要求。这些情况下，须设为false，表示返回的RDD没有被分区器分过区。

In [37]:
val rdd = sc.parallelize(1 to 20)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[59] at parallelize at <console>:25


In [38]:
val func = (iter:Iterator[Int]) =>{
  var sum = 0
  while(iter.hasNext){
    sum += iter.next
  }
  List(sum).iterator
}

func: Iterator[Int] => Iterator[Int] = <function1>


In [39]:
rdd.mapPartitions(func).collect

res21: Array[Int] = Array(6, 15, 34, 36, 45, 74)


In [40]:
def func1(iter:Iterator[Int]):Iterator[Int] ={
  var sum = 0
  while(iter.hasNext){
    sum += iter.next
  }
  List(sum).iterator
}

func1: (iter: Iterator[Int])Iterator[Int]


In [41]:
rdd.mapPartitions(func1).collect

res22: Array[Int] = Array(6, 15, 34, 36, 45, 74)


In [42]:
rdd.mapPartitions{iter => List(iter.sum).iterator}.collect

res23: Array[Int] = Array(6, 15, 34, 36, 45, 74)


In [43]:
rdd.mapPartitions{iter => List(iter.max).iterator}.collect

res24: Array[Int] = Array(3, 6, 10, 13, 16, 20)


In [44]:
val rdd = sc.parallelize( List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female")) )

rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[64] at parallelize at <console>:25


In [45]:
rdd.mapPartitions{iter => iter.filter{case (_,v) => v == "female"}}.collect

res25: Array[(String, String)] = Array((kpop,female), (lucy,female))


### mapPartitionsWithIndex(func)
```scala
  //Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
  def mapPartitionsWithIndex[U: ClassTag](
      f: (Int, Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U]      
```
类似于mapPartitions，但func带有一个整数参数表示分片(分区)的索引值，因此在类型为T的RDD上运行时，func的函数类型必须是(Int, Interator[T]) => Iterator[U]

In [46]:
val rdd = sc.parallelize(1 to 20)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[66] at parallelize at <console>:25


In [47]:
rdd.mapPartitionsWithIndex( (index,iter) => List((index,iter.toList)).toIterator ).collect

res26: Array[(Int, List[Int])] = Array((0,List(1, 2, 3)), (1,List(4, 5, 6)), (2,List(7, 8, 9, 10)), (3,List(11, 12, 13)), (4,List(14, 15, 16)), (5,List(17, 18, 19, 20)))


In [48]:
val rdd = sc.parallelize(1 to 4)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[68] at parallelize at <console>:25


In [49]:
rdd.mapPartitionsWithIndex( (index,iter) => List((index,iter.toList)).toIterator ).collect

res27: Array[(Int, List[Int])] = Array((0,List()), (1,List(1)), (2,List(2)), (3,List()), (4,List(3)), (5,List(4)))


可以看到，分区中元素为空的分区也调用了此函数，每个分区调用一次

### sample(withReplacement, fraction, [seed])
```scala
  //Return a sampled subset of this RDD.
  def sample(withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T]     
```
在RDD中以seed为种子返回大致上有fraction比例的数据样本的RDD，withReplacement表示是否采用有放回的抽样。

In [50]:
val rdd = sc.parallelize(1 to 100)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[70] at parallelize at <console>:25


In [51]:
rdd.sample(false,0.2,3).collect

res28: Array[Int] = Array(1, 16, 20, 22, 29, 30, 32, 38, 43, 46, 52, 56, 62, 71, 76, 87, 89, 91, 92, 95)


In [52]:
rdd.sample(false,0.2,3).collect

res29: Array[Int] = Array(1, 16, 20, 22, 29, 30, 32, 38, 43, 46, 52, 56, 62, 71, 76, 87, 89, 91, 92, 95)


In [53]:
rdd.sample(false,0.2).collect

res30: Array[Int] = Array(7, 9, 10, 13, 23, 24, 29, 44, 46, 48, 50, 54, 55, 59, 66, 68, 74, 79, 81, 83, 86, 91, 95)


In [54]:
rdd.sample(false,0.2).collect

res31: Array[Int] = Array(3, 6, 11, 14, 16, 21, 27, 34, 43, 45, 52, 63, 65, 68, 76, 79, 82, 91, 95, 96)


### repartition(numPartitions)
```scala
  //Return a new RDD that has exactly numPartitions partitions.
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }   
```
根据分区数，从新通过网络随机洗牌所有数据。当指定的分区数比当前分区数目少时，考虑使用coalesce，这样能够避免shuffle。

有时，我们希望在除分组操作和聚合操作之外的操作中也能改变RDD的分区。对于这样的情况，Spark提供了repartition()函数。它会把数据通过网络进行混洗，并创建出新的分区集合。切记，对数据进行重新分区是代价相对比较大的操作。Spark中也有一个优化版的repartition()，叫作coalesce()。你可以使用Java或Scala中的rdd.partitions.size()以及Python中的rdd.getNumPartitions查看RDD的分区数，并确保调用coalesce()时将RDD合并到比现在的分区数更少的分区中。

Spark提供了两种方法来对操作的并行度进行调优。第一种方法是在数据混洗操作时，使用参数的方式为混洗后的RDD指定并行度。第二种方法是对于任何已有的RDD，可以进行重新分区来获取更多或者更少的分区数。重新分区操作通过repartition()实现，该操作会把RDD随机打乱并分成设定的分区数目。如果你确定要减少RDD分区，可以使用coalesce()操作。由于没有打乱数据，该操作比repartition()更为高效。如果你认为当前的并行度过高或者过低，可以利用这些方法对数据分布进行重新调整。

In [55]:
val rdd = sc.parallelize(1 to 20,6)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[75] at parallelize at <console>:25


In [56]:
rdd.partitions.size

res32: Int = 6


In [57]:
val rerdd = rdd.repartition(10)

rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[79] at repartition at <console>:27


In [58]:
rerdd.partitions.size

res33: Int = 10


### coalesce(numPartitions)
```scala
  def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T]   
```
缩减分区数，用于大数据集过滤后，提高小数据集的执行效率。

重新给RDD的元素分区。  
当适当缩小分区数时，如1000->100，spark会把之前的10个分区当作一个分区，并行度变为100，不会引起数据shuffle。  
当严重缩小分区数时，如1000->1，运算时的并行度会变成1。为了避免并行效率低下问题，可将shuffle设为true。shuffle之前的运算和之后的运算分为不同stage，它们的并行度分别为1000,1。  
当把分区数增大时，必会存在shuffle，shuffle须设为true。  

In [59]:
val rdd = sc.parallelize(1 to 20,6)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[80] at parallelize at <console>:25


In [60]:
val rerdd = rdd.coalesce(2)

rerdd: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[81] at coalesce at <console>:27


In [61]:
rerdd.partitions.size

res34: Int = 2


### glom
```scala
def glom(): RDD[Array[T]]  
```
将每一个分区形成一个数组，形成新的RDD类型时RDD[Array[T]]

In [62]:
val rdd = sc.parallelize(1 to 20)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[82] at parallelize at <console>:25


In [63]:
rdd.mapPartitionsWithIndex( (index,iter) => List((index,iter.toList)).toIterator ).collect

res35: Array[(Int, List[Int])] = Array((0,List(1, 2, 3)), (1,List(4, 5, 6)), (2,List(7, 8, 9, 10)), (3,List(11, 12, 13)), (4,List(14, 15, 16)), (5,List(17, 18, 19, 20)))


In [64]:
rdd.glom().collect

res36: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10), Array(11, 12, 13), Array(14, 15, 16), Array(17, 18, 19, 20))


### pipe(command, [envVars])
```scala
def pipe(command: String): RDD[String]
def pipe(command: String, env: Map[String, String]): RDD[String] 
```
对于每个分区，都执行一个perl或者shell脚本，返回输出的RDD

管道(pipe)操作：

spark在RDD上提供了 pipe() 方法。通过pipe()，你可以使用任意语言将RDD中的各元素从标准输入流中以字符串形式读出，并将这些元素执行任何你需要的操作，然后把结果以字符串形式写入标准输出，这个过程就是RDD的转化操作过程。

使用pipe()的方法很简单，假如我们有一个用其他语言写成的从标准输入接收数据并将处理结果写入标准输出的可执行脚本，我们只需要将该脚本分发到各个节点相同路径下，并将其路径作为pipe()的参数传入即可。

一个例子：

**注意：shell脚本需要集群中的所有节点都能访问到。**
```scala
scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24

scala> rdd.pipe("/home/bigdata/pipe.sh").collect()
res18: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)

scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),2)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[52] at parallelize at <console>:24

scala> rdd.pipe("/home/bigdata/pipe.sh").collect()
res19: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you)

pipe.sh:
#!/bin/sh
echo "AA"
while read LINE; do
   echo ">>>"${LINE}
done
```

### Pair RDD的转化操作
Spark为包含键值对类型的RDD提供了一些专有的操作。这些RDD被称为pair RDD1。PairRDD是很多程序的构成要素，因为它们提供了并行操作各个键或跨节点重新进行数据分组的操作接口。例如，pairRDD提供reduceByKey()方法，可以分别归约每个键对应的数据，还有join()方法，可以把两个RDD中键相同的元素组合到一起，合并为一个RDD。我们通常从一个RDD中提取某些字段（例如代表事件时间、用户ID或者其他标识符的字段），并使用这些字段作为pair RDD操作中的键。

在Spark中有很多种创建pair RDD的方式。此处不讲，很多存储键值对的数据格式会在读取时直接返回由其键值对数据组成的pairRDD。此外，当需要把一个普通的RDD转为pairRDD时，可以调用map()函数来实现，传递的函数需要返回键值对。

在Scala中，为了让提取键之后的数据能够在函数中使用，同样需要返回二元组。隐式转换可以让二元组RDD支持附加的键值对函数。
```
//在Scala中使用第一个单词作为键创建出一个pair RDD
val pairs = lines.map(x => (x.split(" ")(0), x))
```

Pair RDD 可以使用所有标准 RDD 上的可用的转化操作。由于 pair RDD 中包含二元组，所以需要传递的函数应当操作二元组而不是独立的元素。 

### keys()
```scala
//Return an RDD with the keys of each tuple.
def keys: RDD[K] = self.map(_._1)  
```
返回一个仅包含键的 RDD

In [65]:
val rdd = sc.parallelize( List( ("tom", 1), ("jerry", 2), ("kitty", 3),("jerry", 9), ("tom", 8), ("shuke", 7) ) )

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[85] at parallelize at <console>:25


In [66]:
rdd.keys.collect

res37: Array[String] = Array(tom, jerry, kitty, jerry, tom, shuke)


### values()
```scala
//Return an RDD with the values of each tuple.
def values: RDD[V] = self.map(_._2)
```
返回一个仅包含值的 RDD

In [67]:
rdd.values.collect

res38: Array[Int] = Array(1, 2, 3, 9, 8, 7)


### mapValues(func)
```scala
def mapValues[U](f: V => U): RDD[(K, U)]
```
对 pair RDD 中的每个值应用一个函数而不改变键

In [68]:
rdd.mapValues(_ * 10).collect

res39: Array[(String, Int)] = Array((tom,10), (jerry,20), (kitty,30), (jerry,90), (tom,80), (shuke,70))


### flatMapValues(func)
```scala
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
```
对 pair RDD 中的每个值应用一个返回迭代器的函数， 然后对返回的每个元素都生成一个对应原键的键值对记录。 通常用于符号化

In [69]:
rdd.flatMapValues( v => List(v,v)).collect

res40: Array[(String, Int)] = Array((tom,1), (tom,1), (jerry,2), (jerry,2), (kitty,3), (kitty,3), (jerry,9), (jerry,9), (tom,8), (tom,8), (shuke,7), (shuke,7))


### partitionBy(partitioner)
```scala
//Return a copy of the RDD partitioned using the specified partitioner.
def partitionBy(partitioner: Partitioner): RDD[(K, V)]   
```
该函数根据partitioner函数生成新的ShuffleRDD，将原RDD重新分区。

In [70]:
val rdd = sc.parallelize( List( ("a",1),("b",1),("c",1),("d",1),("a",1),("b",1),("g",1),("c",1),("b",1),("g",1),("d",1) ) ,4)

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[90] at parallelize at <console>:25


In [71]:
rdd.mapPartitionsWithIndex( (index,iter) => List((index,iter.toList)).toIterator ).collect

res41: Array[(Int, List[(String, Int)])] = Array((0,List((a,1), (b,1))), (1,List((c,1), (d,1), (a,1))), (2,List((b,1), (g,1), (c,1))), (3,List((b,1), (g,1), (d,1))))


In [72]:
val rdd1 = rdd.partitionBy(new org.apache.spark.HashPartitioner(4))

rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[92] at partitionBy at <console>:27


In [73]:
rdd1.mapPartitionsWithIndex( (index,iter) => List((index,iter.toList)).toIterator ).collect

res42: Array[(Int, List[(String, Int)])] = Array((0,List((d,1), (d,1))), (1,List((a,1), (a,1))), (2,List((b,1), (b,1), (b,1))), (3,List((c,1), (g,1), (c,1), (g,1))))


### reduceByKey(func, [numTasks])
```scala
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
    reduceByKey(new HashPartitioner(numPartitions), func)
  }

  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }
```
在一个(K,V)的RDD上调用，返回一个(K,V)的RDD，使用指定的reduce函数，将相同key的值聚合到一起，与groupByKey类似，reduce任务的个数可以通过第二个可选的参数来设置。

In [74]:
val rdd = sc.parallelize( List( ("tom", 1), ("jerry", 2), ("kitty", 3),("jerry", 9), ("tom", 8), ("shuke", 7) ) )

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[94] at parallelize at <console>:25


In [75]:
rdd.reduceByKey(_+_).collect

res43: Array[(String, Int)] = Array((tom,9), (kitty,3), (jerry,11), (shuke,7))


![reduceByKey](imgs/reduceByKey.jpg)

In [76]:
rdd.reduceByKey( (x,y) => x + y  ).collect

res44: Array[(String, Int)] = Array((tom,9), (kitty,3), (jerry,11), (shuke,7))


### groupByKey（[numTasks]）
```scala
  def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(defaultPartitioner(self))
  }

  def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(new HashPartitioner(numPartitions))
  }

    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
```
在一个(K,V)的RDD上调用，返回一个(K, Iterator[V])的RDD。

In [77]:
val rdd = sc.parallelize( List( ("tom", 1), ("jerry", 2), ("kitty", 3),("jerry", 9), ("tom", 8), ("shuke", 7) ) )

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[97] at parallelize at <console>:25


In [78]:
val groupRdd = rdd.groupByKey()

groupRdd: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[98] at groupByKey at <console>:27


In [79]:
groupRdd.collect

res45: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 8)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(2, 9)), (shuke,CompactBuffer(7)))


In [80]:
groupRdd.map{ case (name,values) => (name,values.sum) }.collect

res46: Array[(String, Int)] = Array((tom,9), (kitty,3), (jerry,11), (shuke,7))


### groupBy（[numTasks]）
```scala
  def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
    groupBy[K](f, defaultPartitioner(this))
  }

  def groupBy[K](
      f: T => K,
      numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
    groupBy(f, new HashPartitioner(numPartitions))
  }

  def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
      : RDD[(K, Iterable[T])] = withScope {
    val cleanF = sc.clean(f)
    this.map(t => (cleanF(t), t)).groupByKey(p)
  }
```
传入一个参数的函数，按照传入的参数为key，返回一个新的RDD[(K, Iterable[T])]，value是所有可以相同的传入数据组成的迭代器。**这是普通RDD的函数**

这个函数可以把一个普通的RDD转成了Pair RDD

In [81]:
val rdd=sc.parallelize(List(("a",1,2),("b",1,1),("a",4,5)))

rdd: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[100] at parallelize at <console>:25


In [82]:
rdd.groupBy(_._1).collect

res47: Array[(String, Iterable[(String, Int, Int)])] = Array((a,CompactBuffer((a,1,2), (a,4,5))), (b,CompactBuffer((b,1,1))))


其实Pair RDD也可以使用这个函数

In [83]:
val pairRdd = sc.parallelize( List( ("tom", 1), ("jerry", 2), ("kitty", 3),("jerry", 9), ("tom", 8), ("shuke", 7) ) )

pairRdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[103] at parallelize at <console>:25


In [84]:
pairRdd.groupBy(_._1).collect

res48: Array[(String, Iterable[(String, Int)])] = Array((tom,CompactBuffer((tom,1), (tom,8))), (kitty,CompactBuffer((kitty,3))), (jerry,CompactBuffer((jerry,2), (jerry,9))), (shuke,CompactBuffer((shuke,7))))


### combineByKey[C](  createCombiner: V => C,  mergeValue: (C, V) => C,  mergeCombiners: (C, C) => C) 

```scala
  def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
  }

  def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      numPartitions: Int): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
  }

  def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
      partitioner, mapSideCombine, serializer)(null)
  }
```
和reduceByKey的效果相同，reduceByKey底层就是调用combineByKey

根据key分别使用createCombiner和mergeValue进行相同key的数值聚集，通过mergeCombiners将各个分区最终的结果进行聚集。

参数说明：  
* 第一个参数createCombiner: V => C：生成合并器，每组key，取出第一个value的值，然后返回你想合并的类型。  
* 第二个参数mergeValue: (C, V) => C：函数，局部计算  
* 第三个参数mergeCombiners: (C, C) => C：函数，对局部计算的结果再进行计算  

参数说明2： 
* 对相同K，把V合并成一个集合.  
* createCombiner: combineByKey() 会遍历分区中的所有元素，因此每个元素的键要么还没有遇到过，要么就 和之前的某个元素的键相同。如果这是一个新的元素,combineByKey() 会使用一个叫作 createCombiner() 的函数来创建 那个键对应的累加器的初始值  
* mergeValue: 如果这是一个在处理当前分区之前已经遇到的键， 它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并  
* mergeCombiners: 由于每个分区都是独立处理的， 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器， 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。  


In [85]:
//使用combineByKey()求每个键对应的平均值
val rdd = sc.parallelize( List( ("coffee", 1), ("coffee", 2), ("panda", 3),("coffee", 9) ) ,2)

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[106] at parallelize at <console>:26


In [86]:
val result = rdd.combineByKey(
(v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }

result: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[108] at map at <console>:31


In [87]:
result.collect

res49: Array[(String, Float)] = Array((coffee,4.0), (panda,3.0))


In [88]:
result.collectAsMap().map(println(_))

(coffee,4.0)
(panda,3.0)


res50: Iterable[Unit] = ArrayBuffer((), ())


In [89]:
result.collectAsMap()

res51: scala.collection.Map[String,Float] = Map(coffee -> 4.0, panda -> 3.0)


combineByKey流程示意图：
![combineByKey](imgs/combineByKey.jpg)

### aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]

```scala
  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
  }

  def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
  }

  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
    val zeroArray = new Array[Byte](zeroBuffer.limit)
    zeroBuffer.get(zeroArray)

    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
    val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

    // We will clean the combiner closure later in `combineByKey`
    val cleanedSeqOp = self.context.clean(seqOp)
    combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
      cleanedSeqOp, combOp, partitioner)
  }
```
从aggregateByKey的源代码中，可以看出

a.aggregateByKey把类型为(K,V)的RDD转换为类型为(K,U)的RDD，V和U的类型可以不一样，这一点跟combineByKey是一样的，即返回的二元组的值类型可以不一样

b.aggregateByKey内部是通过调用combineByKey实现的，combineByKey的createCombiner函数逻辑由zeroValue这个变量实现，zeroValue作为聚合的初始值，通常对于加法聚合则为0，乘法聚合则为1，集合操作则为空集合

c.seqOp在combineByKey中的功能是mergeValues，(U,V)=>U

d.combOp在combineByKey中的功能是mergeCombiners

In [90]:
//aggregateByKey()求每个键对应的平均值
val rdd = sc.parallelize( List( ("coffee", 1), ("coffee", 2), ("panda", 3),("coffee", 9) ) ,2)

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[109] at parallelize at <console>:26


In [91]:
val result = rdd.aggregateByKey( (0,0) ) (
(acc, v) => (acc._1 + v, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }

result: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[111] at map at <console>:30


In [92]:
result.collectAsMap()

res52: scala.collection.Map[String,Float] = Map(coffee -> 4.0, panda -> 3.0)


从求均值的实现来看，aggregate通过提供零值的方式，避免了combineByKey中的createCombiner步骤(createCombiner本质工作就是遇到第一个key时进行初始化操作，这个初始化不是提供零值，而是对第一个(k,v)进行转换得到c的初始值））

看下面的例子来看看aggregateByKey的计算流程：

In [93]:
//定义RDD
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[112] at parallelize at <console>:26


In [94]:
//查看分区情况
pairRDD.mapPartitionsWithIndex( 
    (index,iter) => iter.map(x => "[partID:" +  index + ", val: " + x + "]")
).collect

res53: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)])


In [95]:
pairRDD.mapPartitionsWithIndex( 
    (index,iter) => List(index + ":" + iter.mkString("[","; ","]")).iterator
).collect

res54: Array[String] = Array(0:[(cat,2); (cat,5); (mouse,4)], 1:[(cat,12); (dog,12); (mouse,2)])


In [96]:
//注意：初始值为0和其他值的区别
pairRDD.aggregateByKey(0)(_+_,_+_).collect

res55: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))


In [97]:
pairRDD.aggregateByKey(10)(_+_,_+_).collect

res56: Array[(String, Int)] = Array((dog,22), (cat,39), (mouse,26))


In [98]:
//下面三个的区别：，第一个比较好理解，由于初始值为0，所以每个分区输出不同动物中个数最多的那个，然后在累加
pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect

res57: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))


In [99]:
//下面两个：由于有初始值，就需要考虑初始值参与计算，这里第一个分区的元素为("cat",2), ("cat", 5), ("mouse", 4)，初始值是10，不同动物之间两两比较value的大小，都需要将初始值加入比较，所以第一个分区输出为("cat", 10), ("mouse", 10)；第二个分区同第一个分区，输出结果为(dog,12), (cat,12), (mouse,10)；所以最后累加的结果为(dog,12), (cat,22), (mouse,20)，注意最后的对每个分区结果计算的时候，初始值不参与计算
pairRDD.aggregateByKey(10)(math.max(_,_),_+_).collect

res58: Array[(String, Int)] = Array((dog,12), (cat,22), (mouse,20))


In [100]:
//这个和上面的类似
pairRDD.aggregateByKey(100)(math.max(_,_),_+_).collect

res59: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))


### foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
```scala
  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    foldByKey(zeroValue, defaultPartitioner(self))(func)
  }

  def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
  }

  def foldByKey(zeroValue: V,partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    ...
    combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
      cleanedFunc, cleanedFunc, partitioner)
  }
```
aggregateByKey的简化操作，seqop和combop相同

In [101]:
val rdd = sc.parallelize( List( ("coffee", 1), ("coffee", 2), ("panda", 3),("coffee", 9) ) ,2)

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[120] at parallelize at <console>:25


In [102]:
rdd.foldByKey(0)(_ + _).collect

res60: Array[(String, Int)] = Array((coffee,12), (panda,3))


### sortByKey([ascending], [numTasks])
```scala
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
```
在一个(K,V)的RDD上调用，K必须实现Ordered接口，返回一个按照key进行排序的(K,V)的RDD

In [103]:
val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))

rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[122] at parallelize at <console>:25


In [104]:
rdd.sortByKey().collect

res61: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))


In [105]:
rdd.sortByKey(false).collect

res62: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))


### sortBy(func,[ascending], [numTasks])
```scala
  def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
    this.keyBy[K](f)
        .sortByKey(ascending, numPartitions)
        .values
  }
```
与sortByKey类似，但是更灵活,可以用func先对数据进行处理，按照处理后的数据比较结果排序。

底层实现还是sortByKey，使用func生成的key进行排序。**这是普通RDD的函数**

In [106]:
val rdd = sc.parallelize(List(1,2,3,4,11))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[129] at parallelize at <console>:25


In [107]:
rdd.sortBy( x => x ).collect

res63: Array[Int] = Array(1, 2, 3, 4, 11)


In [108]:
rdd.sortBy( _.toString ).collect

res64: Array[Int] = Array(1, 11, 2, 3, 4)


### subtractByKey(otherRdd)
```scala
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] 
```
删掉RDD中键与other RDD中的键相同的元素

In [109]:
val rdd = sc.parallelize( List( (1, 1), (2, 2), (2, 3),(3, 9),(3, 9) ) )

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[140] at parallelize at <console>:25


In [110]:
val otherRdd = sc.parallelize( Array( (3,"a"),(4,"b") ) )

otherRdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[141] at parallelize at <console>:25


In [111]:
rdd.subtractByKey(otherRdd).collect

res65: Array[(Int, Int)] = Array((1,1), (2,2), (2,3))


### join(otherDataset, [numTasks])
```scala
  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
    join(other, defaultPartitioner(self, other))
  }

  def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = self.withScope {
    join(other, new HashPartitioner(numPartitions))
  }

  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues( pair =>
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
    )
  }
```
对两个RDD进行内连接，在类型为(K,V)和(K,W)的RDD上调用，返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD.

In [112]:
val left = sc.parallelize(List((1,"a"),(2,"a"),(2,"b"),(3,"c")))

left: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[143] at parallelize at <console>:25


In [113]:
val right = sc.parallelize(Array((2,4),(3,5),(4,6)))

right: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[144] at parallelize at <console>:25


In [114]:
left.join(right).collect

res66: Array[(Int, (String, Int))] = Array((2,(a,4)), (2,(b,4)), (3,(c,5)))


In [115]:
left.join(right).foreach{ case (a,(b,c)) => println(a,b,c)}

(3,c,5)
(2,a,4)
(2,b,4)


### leftOuterJoin(otherDataset, [numTasks])
```scala
  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = self.withScope {
    leftOuterJoin(other, defaultPartitioner(self, other))
  }
```
左外连接

In [116]:
left.leftOuterJoin(right).foreach{ case (a,(b,c)) => println(a,b,c)}

(1,a,None)
(3,c,Some(5))
(2,a,Some(4))
(2,b,Some(4))


### rightOuterJoin(otherDataset, [numTasks])
```scala
  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = self.withScope {
    rightOuterJoin(other, defaultPartitioner(self, other))
  }
```
右外连接

In [117]:
left.rightOuterJoin(right).foreach{ case (a,(b,c)) => println(a,b,c)}

(4,None,6)
(3,Some(c),5)
(2,Some(a),4)
(2,Some(b),4)


### cogroup(otherDataset, [numTasks])
```scala
  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
    cogroup(other, defaultPartitioner(self, other))
  }
```
将两个RDD中拥有相同键的数据分组到一起。在类型为(K,V)和(K,W)的RDD上调用，返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD

In [118]:
left.cogroup(right).collect

res70: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer())), (2,(CompactBuffer(a, b),CompactBuffer(4))), (3,(CompactBuffer(c),CompactBuffer(5))), (4,(CompactBuffer(),CompactBuffer(6))))


In [119]:
left.cogroup(right).foreach{ case (a,(b,c)) => b.zipAll(c,null,0).foreach{ case (b,c) => println(a,b,c) } }

(2,a,4)
(1,a,0)
(4,null,6)
(3,c,5)
(2,b,0)


上面这个操作并没有实现真正的外联查询，应该改zipAll为一个实现了一个笛卡尔积的函数，可惜我没找到

In [120]:
left.fullOuterJoin(right).foreach{ case (a,(b,c)) => println(a,b,c)}

(1,Some(a),None)
(4,None,Some(6))
(3,Some(c),Some(5))
(2,Some(a),Some(4))
(2,Some(b),Some(4))


In [121]:
left.join(right).foreach{ case (a,(b,c)) => println(a,b,c)}

(3,c,5)
(2,a,4)
(2,b,4)


zipAll函数：

In [122]:
List(1, 2, 3).zipAll(List("a","b"),0,"")

res74: List[(Int, String)] = List((1,a), (2,b), (3,""))


In [123]:
List(1, 2).zipAll(List("a","b","c"),0,"")

res75: List[(Int, String)] = List((1,a), (2,b), (0,c))


## Action
一旦触发，就会执行一个任务

RDD的一些行动操作会以普通集合或者值的形式将RDD的部分或全部数据返回驱动器程序中。

把数据返回驱动器程序中最简单、最常见的操作是collect()，它会将整个RDD的内容返回。collect()通常在单元测试中使用，因为此时RDD的整个内容不会很大，可以放在内存中。使用collect()使得RDD的值与预期结果之间的对比变得很容易。由于需要将数据复制到驱动器进程中，collect()要求所有数据都必须能一同放入单台机器的内存中。

take(n)返回RDD中的n个元素，并且尝试只访问尽量少的分区，因此该操作会得到一个不均衡的集合。需要注意的是，这些操作返回元素的顺序与你预期的可能不一样。

这些操作对于单元测试和快速调试都很有用，但是在处理大规模数据时会遇到瓶颈。

如果为数据定义了顺序，就可以使用top()从RDD中获取前几个元素。top()会使用数据的默认顺序，但我们也可以提供自己的比较函数，来提取前几个元素。

有时需要在驱动器程序中对我们的数据进行采样。takeSample(withReplacement, num,seed)函数可以让我们从数据中获取一个采样，并指定是否替换。

有时我们会对RDD中的所有元素应用一个行动操作，但是不把任何结果返回到驱动器程序中，这也是有用的。比如可以用JSON格式把数据发送到一个网络服务器上，或者把数据存到数据库中。不论哪种情况，都可以使用foreach()行动操作来对RDD中的每个元素进行操作，而不需要把RDD发回本地。

关于基本RDD上的更多标准操作，我们都可以从其名称推测出它们的行为。count()用来返回元素的个数，而countByValue()则返回一个从各值到值对应的计数的映射表。

### collect()
```scala
  //Return an array that contains all of the elements in this RDD.
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }
```
在驱动程序中，以数组的形式返回数据集的所有元素

In [124]:
val rdd = sc.parallelize( List(1, 2, 3, 3, 4 , 5) )

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[167] at parallelize at <console>:25


In [125]:
rdd.collect

res76: Array[Int] = Array(1, 2, 3, 3, 4, 5)


### count()
```scala
  //Return the number of elements in the RDD.
  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
```
RDD中的元素个数

In [126]:
rdd.count

res77: Long = 6


### countByValue()
```scala
  //Return the count of each unique value in this RDD as a local map of (value, count) pairs.
  //rdd.map(x => (x, 1L)).reduceByKey(_ + _)
  def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {
    map(value => (value, null)).countByKey()
  }
```
各元素在RDD中出现的次数

In [127]:
val rdd = sc.parallelize( List(1, 2, 3, 3, 4 , 5) )

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[168] at parallelize at <console>:25


In [128]:
rdd.countByValue

res78: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 1, 2 -> 1, 3 -> 2, 4 -> 1)


### countByKey()
```scala
  def countByKey(): Map[K, Long] = self.withScope {
    self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
  }
```
对每个键对应的元素分别计数。**此函数Pair RDD的行动操作**

In [129]:
val rdd = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[172] at parallelize at <console>:25


In [130]:
rdd.countByKey

res79: scala.collection.Map[String,Long] = Map(a -> 1, b -> 2, c -> 2)


In [131]:
rdd.countByValue

res80: scala.collection.Map[(String, Int),Long] = Map((b,2) -> 2, (a,1) -> 1, (c,2) -> 1, (c,1) -> 1)


### collectAsMap()
```scala
  def collectAsMap(): Map[K, V] = self.withScope {
    val data = self.collect()
    val map = new mutable.HashMap[K, V]
    map.sizeHint(data.length)
    data.foreach { pair => map.put(pair._1, pair._2) }
    map
  }
```
将结果以映射表的形式返回，以便查询。**此函数Pair RDD的行动操作**

In [132]:
pairRDD.collectAsMap

res81: scala.collection.Map[String,Int] = Map(dog -> 12, cat -> 12, mouse -> 2)


### lookup(key)
```scala
  /**
   * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
   * RDD has a known partitioner by only searching the partition that the key maps to.
   */
  def lookup(key: K): Seq[V] = self.withScope {
    self.partitioner match {
      case Some(p) =>
        val index = p.getPartition(key)
        val process = (it: Iterator[(K, V)]) => {
          val buf = new ArrayBuffer[V]
          for (pair <- it if pair._1 == key) {
            buf += pair._2
          }
          buf
        } : Seq[V]
        val res = self.context.runJob(self, process, Array(index))
        res(0)
      case None =>
        self.filter(_._1 == key).map(_._2).collect()
    }
  }
```
返回给定键对应的所有值。**此函数Pair RDD的行动操作**

In [133]:
val rdd = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[178] at parallelize at <console>:25


In [134]:
rdd.lookup("c")

res82: Seq[Int] = WrappedArray(2, 1)


### take(num)
```scala
def take(num: Int): Array[T]
```
从RDD中返回前num个元素的数组

In [135]:
val rdd = sc.parallelize(Seq(10, 4, 2, 12, 3))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[181] at parallelize at <console>:25


In [136]:
rdd.take(2)

res83: Array[Int] = Array(10, 4)


### first()
```scala
  /**
   * Return the first element in this RDD.
   */
  def first(): T = withScope {
    take(1) match {
      case Array(t) => t
      case _ => throw new UnsupportedOperationException("empty collection")
    }
  }
```
返回RDD的第一个元素（类似于take(1)）

In [137]:
rdd.first

res84: Int = 10


### top(num)
```scala
  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    takeOrdered(num)(ord.reverse)
  }
```
top函数用于从RDD中，按照默认（降序）或者指定的排序规则，返回前num个元素。

In [138]:
rdd.top(2)

res85: Array[Int] = Array(12, 10)


### takeOrdered(num)([Ordering])
```scala
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
```
从RDD中按照提供的顺序返回最前面的num个元素。takeOrdered和top类似，只不过以和top相反的顺序返回元素。

In [139]:
rdd.takeOrdered(2)

res86: Array[Int] = Array(2, 3)


### takeSample(withReplacement,num, [seed])
```scala
def takeSample(withReplacement: Boolean,num: Int,seed: Long = Utils.random.nextLong): Array[T]
```
返回一个数组，该数组由从数据集中随机采样的num个元素组成，seed用于指定随机数生成器种子

In [140]:
rdd.collect

res87: Array[Int] = Array(10, 4, 2, 12, 3)


In [141]:
rdd.takeSample(false,3)

res88: Array[Int] = Array(12, 3, 10)


In [142]:
rdd.takeSample(false,10)

res89: Array[Int] = Array(2, 3, 10, 4, 12)


In [143]:
rdd.takeSample(true,10)

res90: Array[Int] = Array(4, 10, 4, 4, 2, 3, 10, 3, 3, 12)


可以看到有放回的抽样可以取出多于RDD中元素数量的元素

### reduce(func)
```scala
def reduce(f: (T, T) => T): T
```
通过func函数聚集RDD中的所有元素，这个功能必须是可交换且可并联的。并行整合RDD中所有数据（例如sum）。

In [144]:
rdd.reduce((x, y) => x + y)

res91: Int = 31


In [145]:
rdd.reduce(_ + _)

res92: Int = 31


In [146]:
rdd.sum

res93: Double = 31.0


### fold(num)(func)
```scala
def fold(zeroValue: T)(op: (T, T) => T): T
```
和reduce()一样，但是需要rdd.fold(0)((x, y) => x + y)提供初始值

折叠操作，aggregate的简化操作，seqop和combop一样。

In [147]:
rdd.fold(0)((x, y) => x + y)

res94: Int = 31


### aggregate (zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
```scala
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
```
和reduce()相似，但是通常返回不同类型的函数

aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合，然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

将RDD中元素聚集，须提供0初值（因为累积元素，所有要提供累积的初值）。先在分区内依照seqOp函数聚集元素（把T类型元素聚集为U类型的分区“结果”），再在分区间按照combOp函数聚集分区计算结果，最后返回这个结果

第一个参数是初始值, 第二个参数:是两个函数[每个函数都是2个参数(第一个参数:先对个个分区进行合并, 第二个:对个个分区合并后的结果再进行合并), 输出一个参数]

In [148]:
val rdd = sc.parallelize(Seq(10, 4, 2, 12, 3))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[187] at parallelize at <console>:25


In [149]:
//总和sum以及count
rdd.aggregate( (0, 0) ) (
(x, y) =>(x._1 + y, x._2 + 1),
(x, y) =>(x._1 + y._1, x._2 + y._2)
)

res95: (Int, Int) = (31,5)


In [150]:
//看看aggregate的运行流程
rdd.aggregate( (1, 1) ) (
(x, y) =>{println(x,y,"   1=>   ",x._1 + y, x._2 + 1);(x._1 + y, x._2 + 1)},
(x, y) =>{println(x,y,"   2=>   ",x._1 + y._1, x._2 + y._2);(x._1 + y._1, x._2 + y._2)}
)

((1,1),4,   1=>   ,5,2)
((1,1),2,   1=>   ,3,2)
((1,1),3,   1=>   ,4,2)
((1,1),12,   1=>   ,13,2)
((1,1),10,   1=>   ,11,2)
((1,1),(1,1),   2=>   ,2,2)
((2,2),(3,2),   2=>   ,5,4)
((5,4),(5,2),   2=>   ,10,6)
((10,6),(4,2),   2=>   ,14,8)
((14,8),(13,2),   2=>   ,27,10)
((27,10),(11,2),   2=>   ,38,12)


res96: (Int, Int) = (38,12)


可以看到zeroValue在每个分区内和分局之间的聚合都会计算，上面的分局之间的聚合多了一个(1,1)是因为，这个rdd有6个分区，其中一个为空。每个分区多加1，加上分局之间的聚合多加了一个1，所以初始值(1, 1)比(0, 0)结果多了7。

In [151]:
//看看5个分区的情况
sc.parallelize(Seq(10, 4, 2, 12, 3) , 5)
.aggregate( (1, 1) ) (
(x, y) =>{println(x,y,"   1=>   ",x._1 + y, x._2 + 1);(x._1 + y, x._2 + 1)},
(x, y) =>{println(x,y,"   2=>   ",x._1 + y._1, x._2 + y._2);(x._1 + y._1, x._2 + y._2)}
)

((1,1),4,   1=>   ,5,2)
((1,1),3,   1=>   ,4,2)
((1,1),2,   1=>   ,3,2)
((1,1),12,   1=>   ,13,2)
((1,1),10,   1=>   ,11,2)
((1,1),(5,2),   2=>   ,6,3)
((6,3),(4,2),   2=>   ,10,5)
((10,5),(3,2),   2=>   ,13,7)
((13,7),(13,2),   2=>   ,26,9)
((26,9),(11,2),   2=>   ,37,11)


res97: (Int, Int) = (37,11)


In [152]:
//看看3个分区的情况
sc.parallelize(Seq(10, 4, 2, 12, 3) , 3)
.aggregate( (1, 1) ) (
(x, y) =>{println(x,y,"   1=>   ",x._1 + y, x._2 + 1);(x._1 + y, x._2 + 1)},
(x, y) =>{println(x,y,"   2=>   ",x._1 + y._1, x._2 + y._2);(x._1 + y._1, x._2 + y._2)}
)

((1,1),4,   1=>   ,5,2)
((5,2),2,   1=>   ,7,3)
((1,1),12,   1=>   ,13,2)
((13,2),3,   1=>   ,16,3)
((1,1),10,   1=>   ,11,2)
((1,1),(16,3),   2=>   ,17,4)

res98: (Int, Int) = (35,9)



((17,4),(7,3),   2=>   ,24,7)
((24,7),(11,2),   2=>   ,35,9)


再看下面的例子就是简简单单：

In [153]:
val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)

rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[190] at parallelize at <console>:25


In [154]:
//这里需要注意，由于每个分区计算是并行计算，所以计算出的结果有先后顺序，所以结果可能会出现两种情况：如下
rdd2.aggregate("")(_+_,_+_)

res99: String = defabc


In [155]:
//这里需要注意，由于每个分区计算是并行计算，所以计算出的结果有先后顺序，所以结果可能会出现两种情况：如下
rdd2.aggregate("")(_+_,_+_)

res100: String = defabc


In [156]:
rdd2.aggregate("=")(_+_,_+_)

res101: String = ==def=abc


### saveAsTextFile(path)
```scala
def saveAsTextFile(path: String): Unit 
```
将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统，对于每个元素，Spark将会调用toString方法，将它装换为文件中的文本。**path不能存在和hdfs类似**

存到本地:
```scala
//存到了F:\hadoop_test\rdd目录下，每个分区一个文件和MapReduce操作类似，连文件名都差不多
rdd.saveAsTextFile("""F:\hadoop_test\rdd""")
```
![saveAsTextFile](imgs/saveAsTextFile.jpg)


存到hdfs:
```scala
rdd.saveAsTextFile("""hdfs://hadoop01:9000/out10""")
```

### saveAsSequenceFile(path) 
```scala
class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag](
    self: RDD[(K, V)],
    _keyWritableClass: Class[_ <: Writable],
    _valueWritableClass: Class[_ <: Writable])
  extends Logging
  with Serializable {
  def saveAsSequenceFile(
      path: String,
      codec: Option[Class[_ <: CompressionCodec]] = None): Unit
}
```
将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下，可以使HDFS或者其他Hadoop支持的文件系统。**键值对的RDD支持这个函数**

```scala
pairRDD.saveAsSequenceFile("""F:\hadoop_test\rdd1""")
pairRDD.saveAsSequenceFile("""hdfs://hadoop01:9000/out11""")
```

### saveAsObjectFile(path) 
```scala
//Save this RDD as a SequenceFile of serialized objects.
//def saveAsObjectFile(path: String): Unit
```
用于将RDD中的元素序列化成对象，存储到文件中。

```scala
rdd..saveAsSequenceFile("""F:\hadoop_test\rdd1""")
rdd..saveAsSequenceFile("""hdfs://hadoop01:9000/out11""")
```

In [157]:
//rdd.saveAsObjectFile("""F:\hadoop_test\rdd1""")

### foreach(func)
```scala
  //Applies a function f to all elements of this RDD.
  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }
```

在数据集的每一个元素上，运行func函数。

In [158]:
var rdd = sc.makeRDD(1 to 10,2)
var sum = sc.accumulator(0)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[193] at makeRDD at <console>:25
sum: org.apache.spark.Accumulator[Int] = 0


In [159]:
rdd.foreach(sum+=_)

In [160]:
sum.value

res104: Int = 55
