### RDD的两类操作

In [17]:
// 先读取本地数据，方便之后的操作

val lines = sc.textFile("../data/data.txt")

lines: org.apache.spark.rdd.RDD[String] = ../data/data.txt MapPartitionsRDD[12] at textFile at <console>:27


In [18]:
// 返回几个元素

lines.take(5)

res12: Array[String] = Array("ELECBOOK CLASSICS ", "DAVID ", "COPPERFIELD ", "", "Charles Dickens ")


### Transformations

#### 1. map(func)

每个元素经过函数func计算，得到的结果组成新的数据集（dataset）

In [27]:
// 将lines中每行的字符串，通过map操作，产生新的行长度的数据集

var lineLengths = lines.map(line => line.length)

lineLengths: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[17] at map at <console>:28


In [28]:
lineLengths.take(5)

res17: Array[Int] = Array(17, 6, 12, 0, 16)


#### 2. filter(func)

将符合函数func的元素组成新的数据集

In [10]:
var linesFiltered = lines.filter(line => line.length > 10)

linesFiltered: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:26


In [11]:
linesFiltered.take(5)

res6: Array[String] = Array("ELECBOOK CLASSICS ", "COPPERFIELD ", "Charles Dickens ", ELECBOOK CLASSICS, ebc0004. Charles Dickens: David Copperfield)


#### 3. flatMap(func)

与map()类似，不同之处在于每个输入可以映射到多个输出，函数func应该返回Seq（）序列类型返回值类型

In [9]:
var linesFlatMap = lines.flatMap(line => line.split(" "))

linesFlatMap: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:26


In [10]:
linesFlatMap.take(5)

res6: Array[String] = Array(ELECBOOK, CLASSICS, DAVID, COPPERFIELD, "")


#### 4. mapPartitions(func)

与map()类似，不同之处在于，在不同的分区（partition）分别运行RDD，函数func必须是Iterator<T> => Iterator<U>迭代器类型

MapPartitions操作的优点：

如果是普通的map，比如一个partition中有1万条数据；那么你的function要执行和计算1万次。
但是，使用MapPartitions操作之后，一个task（其实就是一个分区）仅仅会执行一次function，function一次接收所有的partition数据。只要执行一次就可以了，性能比较高。
    
MapPartitions的缺点：
    
如果是普通的map操作，一次function的执行就处理一条数据；那么如果内存不够用的情况下，比如处理了1千条数据了，那么这个时候内存不够了，那么就可以将已经处理完的1千条数据从内存里面垃圾回收掉，或者用其他方法，腾出空间来吧。

In [29]:
val linesMapPartition = lines.mapPartitions(x => x.map(x => x.length()))

linesMapPartition: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[18] at mapPartitions at <console>:26


In [30]:
linesMapPartition.take(5)

res18: Array[Int] = Array(17, 6, 12, 0, 16)


#### 5. mapPartitionsWithIndex(func)

与mapPartitions()类似，不同之处在于，提供了一个整数来代表partition的序号，函数func必须是(Int, Iterator<T>) => Iterator<U>类型https://blog.csdn.net/qq_37050372/article/details/82620737

In [166]:
val linesMapPartition = lines.mapPartitionsWithIndex(
    (index: Int, x: Iterator[String]) => x.map(
        (x) => x.length()
    )
)

linesMapPartition: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[212] at mapPartitionsWithIndex at <console>:26


In [167]:
linesMapPartition.take(5)

res55: Array[Int] = Array(18, 6, 12, 0, 16)


#### 6. sample(withReplacement, fraction, seed)

使用指定的随机数种子，在数据中采样出一部分，可选是否有替换

withReplacement：表示抽出样本后是否在放回去，true表示会放回去，这也就意味着抽出的样本可能有重复
fraction ：抽出多少，这是一个double类型的参数,0-1之间，eg:0.3表示抽出30%
seed：表示一个种子，根据这个seed随机抽取，一般情况下只用前两个参数就可以，这个参数一般用于调试，有时候不知道是程序出问题还是数据出了问题，就可以将这个参数设置为定值

In [32]:
// withReplacement = true 可以重复选取
// fraction = 0.3  抽取30%

val linesSample = lines.sample(true, 0.3)

linesSample: org.apache.spark.rdd.RDD[String] = PartitionwiseSampledRDD[20] at sample at <console>:29


In [18]:
lines.count()

res9: Long = 34171


In [19]:
linesSample.count()

res10: Long = 10133


In [20]:
linesSample.take(10)

res11: Array[String] = Array("ELECBOOK CLASSICS ", "ELECBOOK CLASSICS ", "Low cost licenses are available. Contact us through our web site ", ". The Electric Book Co 1998 ", "", "", DAVID, "", THE PERSONAL HISTORY AND, "")


#### 7. union(otherDataset)

返回两个数据集的联合,\\\s+，将line以空格作为切割符，若line不为空，组合为新的数据集，union函数中A.union(B),A与B联合

In [21]:
var words = lines.flatMap(line => line.split("\\s+")).filter(s => !s.isEmpty())

words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26


In [22]:
words.count()

res12: Long = 261269


In [144]:
val wordUnion = words.union(words)

wordUnion: org.apache.spark.rdd.RDD[String] = UnionRDD[208] at union at <console>:26


In [145]:
wordUnion.count()

res45: Long = 522538


#### 8. intersection(otherDataset)

返回两个数据集的交集

In [136]:
var words = lines.flatMap(line => line.split("\\s+"))
                     .filter(s => !s.isEmpty())

words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[201] at filter at <console>:27


In [141]:
val wordsCount = words.count()

wordsCount: Long = 261269


In [142]:
val wordIntersection = words.intersection(words)

wordIntersection: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[207] at intersection at <console>:26


In [143]:
wordIntersection.count()

res44: Long = 26480


#### 9. distinct([numPartitions])

去重函数

In [129]:
var words = lines.flatMap(line => line.split("\\s+")) .filter(s => !s.isEmpty())

words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[190] at filter at <console>:27


In [130]:
words.count()

res41: Long = 261269


In [134]:
val wordDistinct = words.distinct(5)

wordDistinct: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[199] at distinct at <console>:26


In [135]:
wordDistinct.count()

res43: Long = 26480


#### 10. groupByKey([numPartitions])

在(K, V)数据集上调用时，返回(K, Iterable<V>)类型
    
**注1：**如果group操作之后是为了聚合（sum，average），则使用reduceByKey或者aggregateByKey可以获得更好的性能；
    
**注2：**默认情况下，并行取决于父RDD的分区数量（the number of partitions），可以手动设置numPartitions来使用不同的任务（task）数量**（one task for each partition）**；

In [126]:
var wordPairs = lines.flatMap(line => line.split("\\s+"))
                     .filter(s => !s.isEmpty())
                     .map(s => (s, 1))

wordPairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[187] at map at <console>:28


In [127]:
val wordGroupByKey = wordPairs.groupByKey()

wordGroupByKey: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[188] at groupByKey at <console>:26


In [128]:
wordGroupByKey.take(5)

res40: Array[(String, Iterable[Int])] = Array((Ah!,CompactBuffer(1, 1, 1, 1, 1, 1, 1)), (intimately,CompactBuffer(1)), (bone,CompactBuffer(1)), (twins;,CompactBuffer(1)), (Never,,CompactBuffer(1, 1)))


#### 11. reduceByKey(func, [numPartitions])

在数据集类型（K，V）上调用时，先根据key聚合为（K，（V1，V2，...）），之后对（V1，V2，...）应用reduce函数，生成新的V，最终返回（K，V）类型的数据集，与groupByKey类似，可通过numPartitions配置任务（task）数量

In [118]:
var wordPairs = lines.flatMap(line => line.split("\\s+"))
                     .filter(s => !s.isEmpty())
                     .map(s => (s, 1))

wordPairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[180] at map at <console>:28


In [124]:
val wordReduceByKey = wordPairs.reduceByKey((a, b) => a + b)

wordReduceByKey: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[184] at reduceByKey at <console>:26


In [125]:
wordReduceByKey.take(5)

res39: Array[(String, Int)] = Array((Ah!,7), (intimately,1), (bone,1), (twins;,1), (Never,,2))


#### 12. aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])

在数据集类型（K，V）上调用时，先根据key聚合为（K，（V1，V2，...）），之后通过给定的组合函数（combine functions）和零值（zero value）得出U，最终返回（K，U）类型的数据集，与groupByKey类似，可通过numPartitions配置任务（task）数量

In [110]:
var wordPairs = lines.flatMap(line => line.split("\\s+"))
                     .filter(s => !s.isEmpty())
                     .map(s => (s, 1))

wordPairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[176] at map at <console>:28


In [113]:
def combOp(a: Int, b: Int) : Int= {
    return a + b
}

combOp: (a: Int, b: Int)Int


In [114]:
def seqOp(a: Int, b: Int) : Int= {
    return a + b
}

seqOp: (a: Int, b: Int)Int


In [115]:
val wordAggregateByKey = wordPairs.aggregateByKey(0)(seqOp, combOp)

wordAggregateByKey: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[177] at aggregateByKey at <console>:30


In [116]:
wordAggregateByKey.take(5)

res35: Array[(String, Int)] = Array((Ah!,7), (intimately,1), (bone,1), (twins;,1), (Never,,2))


#### 13. sortByKey([ascending], [numPartitions])

在数据集类型（K，V）上调用时，根据boolean类型的ascending参数，选择根据keys升序或者降序排序

In [105]:
var wordPairs = lines.flatMap(line => line.split("\\s+"))
                     .filter(s => !s.isEmpty())
                     .map(s => (s, 1))

wordPairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[167] at map at <console>:28


In [106]:
var wordSortByKey = wordPairs.sortByKey(true, 5)

wordSortByKey: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[170] at sortByKey at <console>:26


In [107]:
wordSortByKey.take(5)

res33: Array[(String, Int)] = Array((&c.,1), (&c.,,1), ((0)181,1), ((Are,1), ((But,1))


In [108]:
var wordSortByKey = wordPairs.sortByKey(false, 5)

wordSortByKey: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[173] at sortByKey at <console>:26


In [109]:
wordSortByKey.take(5)

res34: Array[(String, Int)] = Array((??????it,1), (????would,1), (????was,1), (????to,1), (????that,1))


#### 14. join(otherDataset, [numPartitions])

在数据集类型（K，V）和（K，W）上调用时，返回（K，（V，W））类型的数据集，支持的outer joins包含leftOuterJoin，rightOuterJoin和fullOuterJoin

In [34]:
var wordPairs = lines.flatMap(line => line.split(" "))
                     .map(s => (s, 1))

wordPairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[40] at map at <console>:27


In [35]:
val wordJoin = wordPairs.join(wordPairs, 5)

wordJoin: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[43] at join at <console>:26


In [37]:
wordJoin.take(5)

res10: Array[(String, (Int, Int))] = Array((weary,,(1,1)), (weary,,(1,1)), (weary,,(1,1)), (weary,,(1,1)), (softness,(1,1)))


#### 15. cogroup(otherDataset,  [numPartitions])

在数据集类型（K，V）和（K，W）上调用时，返回（K，（Iterable<V>，Iterable<W>））元组类型的数据集，该操作也叫groupWith

In [40]:
var wordPairs = lines.flatMap(line => line.split(" "))
                     .map(s => (s, 1))

wordPairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[45] at map at <console>:27


In [41]:
var wordCogroup = wordPairs.cogroup(wordPairs, 5)

wordCogroup: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[47] at cogroup at <console>:26


In [42]:
wordCogroup.take(5)

res13: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((weary,,(CompactBuffer(1, 1),CompactBuffer(1, 1))), (softness,(CompactBuffer(1),CompactBuffer(1))), (bone,(CompactBuffer(1),CompactBuffer(1))), (mantelpiece??I,(CompactBuffer(1),CompactBuffer(1))), (adoration,(CompactBuffer(1),CompactBuffer(1))))


#### 16. cartesian(otherDataset)

在数据集类型T和U上调用时，返回（T，U）数据集

In [27]:
val newDataset = lines.cartesian(lines)

newDataset: org.apache.spark.rdd.RDD[(String, String)] = CartesianRDD[34] at cartesian at <console>:26


In [28]:
newDataset.take(5)

res6: Array[(String, String)] = Array(("ELECBOOK CLASSICS ","ELECBOOK CLASSICS "), ("ELECBOOK CLASSICS ","DAVID "), ("ELECBOOK CLASSICS ","COPPERFIELD "), ("ELECBOOK CLASSICS ",""), ("ELECBOOK CLASSICS ","Charles Dickens "))


#### 17. pipe(command, [envVars])

将RDD中的每个分区通过shell命令串联起来

#### 18. coalesce(numPartitions)

将RDD中的分区数量减少到numPartitions，使得过滤后（filtering down）的数据集操作起来更有效率

In [26]:
lines.coalesce(5)

res5: org.apache.spark.rdd.RDD[String] = CoalescedRDD[33] at coalesce at <console>:28


#### 19. repartition(numPartitions)

重排RDD中的数据，根据numPartitions的值增加或减少partitions，并且自动平衡各个partitions的数据量，使其分布均匀。这个操作通过网络重排所有的数据

In [25]:
lines.repartition(5)

res4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[32] at repartition at <console>:28


#### 20. repartitionAndSortWithinPartitions(partitioner)

根据给定的分区器（partitioner）对RDD重新分区（repartition），每个分好的区中，根据记录（records）的keys排序。

通过将排序操作下沉到重排操作中，使得比先分区再排序更有效率。

### Actions

#### 1. reduce(func)

用函数func聚合数据集中的元素

In [7]:
var lineLengths = lines.map(line => line.length)

lineLengths: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:26


In [8]:
val totalLength = lineLengths.reduce((x, y) => x + y)

totalLength: Int = 1435059


#### 2. collect()

返回数据集中的所有元素，通常是经过filter或者其他操作，使得数据集足够小的时候使用

In [3]:
lines.collect()

res1: Array[String] = Array("ELECBOOK CLASSICS ", "DAVID ", "COPPERFIELD ", "", "Charles Dickens ", "", "", "
", ELECBOOK CLASSICS, "", "", ebc0004. Charles Dickens: David Copperfield, "", "", This file is free for individual use only. It must not be altered or resold., Organisations wishing to use it must first obtain a licence., "Low cost licenses are available. Contact us through our web site ", "", "", ". The Electric Book Co 1998 ", "", "The Electric Book Company Ltd ", "", "20 Cambridge Drive, London SE12 8AJ, UK ", "+44 (0)181 488 3872 www.elecbook.com ", "", "
", DAVID, COPPERFIELD, "", "", THE PERSONAL HISTORY AND, EXPERIENCE OF DAVID, COPPERFIELD THE YOUNGER, "", "", CHARLES DICKENS, "", "", AFFECTIONATELY INSCRIBED TO, THE HON. Mr. AND Mrs. RICHARD WATSON,, OF ROCKINGHAM, N...


#### 3. count()

返回数据集中元素的个数

In [15]:
lines.count()

res8: Long = 34171


#### 4. first()

返回数据集中第一个元素，和take(1)类似

In [16]:
lines.first()

res9: String = "ELECBOOK CLASSICS "


#### 5. take(n)

返回数据集中包含前n个元素的数组

In [17]:
lines.take(5)

res10: Array[String] = Array("ELECBOOK CLASSICS ", "DAVID ", "COPPERFIELD ", "", "Charles Dickens ")


#### 6. takeSample(withReplacement, num, [seed])

返回数据集中随机的num个元素,withReplacement表示元素是否可以被多次抽样

In [18]:
lines.takeSample(false, 5)

res11: Array[String] = Array("", "??Yes. If you think so,?? said I. ", "have time gradually to make her familiar with my hopes, as ", "Charles Dickens ElecBook Classics ", "are so exactly what you used to be, with that agreeable face, and ")


In [19]:
lines.takeSample(false, 5, 777)

res12: Array[String] = Array("She next swept up the crumbs with a little broom (putting on a ", "Agnes, netting a purse. ", "On somebody??s motion, we resolved to go downstairs to the ", "that there was a tradition in the Commons that he lived principally ", "without explanation: she waving her hand and smiling farewell ")


In [24]:
lines.takeSample(true, 5, 777)

res17: Array[String] = Array("a charm, from the moment of his being usefully employed; and if ", "??He??s coming to himself,?? said Peggotty. ", "", "every conceivable variety of discouraging construction on all that ", "")


#### 7. takeOrdered(n, [ordering])

返回RDD中的前n个元素，可选使用自然排序或者自定义排序

In [25]:
lines.takeOrdered(5)

res18: Array[String] = Array("", "", "", "", "")


In [49]:
class MyOrdering extends Ordering[String] {
    override def compare(x: String, y: String): Int = {
        return y.length() - x.length()
    }
}

defined class MyOrdering


In [50]:
var order = new MyOrdering()

order: MyOrdering = MyOrdering@752d291d


In [51]:
lines.takeOrdered(5)(order)

res25: Array[String] = Array("driveller next day, on receipt of a little lace-edged sheet of notepaper, ??Favoured by papa. To remind??; and passed the intervening ", "It was a likely place to sell a jacket in; for the dealers in secondhand clothes were numerous, and were, generally speaking, on the ", "I did. But I felt she was a little impracticable. It damped my newborn ardour, to find that ardour so difficult of communication to ", "out upon me from a doorway, and whispering the word ??Marriagelicence?? in my ear, was with great difficulty prevented from taking ", "I learned, was the presiding judge. In the space within the horseshoe, lower than these, that is to say, on about the level of the ")


#### 8. saveAsTextFile(path)

将数据集中的元素保存到文件系统，文件系统可以是本地磁盘、HDFS或者其他Hadoop支持的文件系统，Spark调用每个元素的toString()方法，将数据集保存成一个或者多个文件

In [9]:
lines.take(5)

res4: Array[String] = Array("ELECBOOK CLASSICS ", "DAVID ", "COPPERFIELD ", "", "Charles Dickens ")


In [35]:
lines.coalesce(1,true).saveAsTextFile("../data/out/a.txt")

#### 9. saveAsSequenceFile(path)
(Java and Scala)

将数据集中的元素保存为Hadoop SequenceFile，要保存的RDDs中的key-value对要实现Hadoop的Writable接口，在scala中，基本类型例如：Int、Double、String等可以隐式转换成Writable

In [14]:
lines.saveAsSequenceFile("../data/out/a.txt")

<console>: 28: error: value saveAsSequenceFile is not a member of org.apache.spark.rdd.RDD[String]

#### 10. saveAsObjectFile(path)
(Java and Scala)

将数据集中的元素用Java序列化方法保存下来，之后可以用SparkContext.objectFile()加载

In [16]:
lines.saveAsObjectFile("../data/out/a.txt")

#### 11. countByKey()

仅适用于（K，V）类型的RDDs，返回一个哈希表（hashmap）存储（K，Int），Int代表每个key出现的次数

In [19]:
var wordPairs = lines.flatMap(line => line.split(" "))
                     .map(s => (s, 1))

wordPairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[26] at map at <console>:27


In [21]:
var linesCountByKey = wordPairs.countByKey()

linesCountByKey: scala.collection.Map[String,Long] = Map(professed -> 2, chary -> 1, strange!?? -> 1, herself. -> 14, face??on??y -> 1, incident -> 5, visit??just -> 1, serious -> 15, brink -> 2, pretensions. -> 2, blushes), -> 1, youthful -> 17, sinister -> 2, comply -> 1, aunt? -> 1, ebb -> 2, breaks -> 3, ??is -> 26, savour, -> 1, sneezed -> 1, forgotten -> 15, yonder??? -> 1, precious -> 15, leer -> 1, stop, -> 3, disgraceful.?? -> 1, compliment -> 5, frostily, -> 1, perhaps??something -> 1, might,?? -> 1, hourly -> 1, got, -> 4, distressed, -> 2, door-post -> 2, respecting -> 8, seclusion, -> 1, ??CANTERBURY, -> 1, evermore.?? -> 1, friendless -> 1, sputtering -> 2, Mell; -> 1, lover -> 2, of. -> 19, lead. -> 1, malignant -> 1, admirably: -> 1, speaker -> 2, ever? -> 1, human, -> 1...


#### 12. foreach(func)

对数据集中的每个元素应用函数func，这通常是更新累加器（Accumulator）或者和外部存储系统交互的副作用

**注：**在foreach()外修改除累加器之外的变量，可能会有无法预料的结果

In [24]:
var linesForeach = lines.take(5).foreach(line => print(line.toUpperCase()))

ELECBOOK CLASSICS DAVID COPPERFIELD CHARLES DICKENS 

linesForeach: Unit = ()
