# RDD编程
### 1.RDD基础
RDD是一个不可变的分布式对象集合。每个RDD存在多个分区，这些分区运行在急群中不同的节点上
创建RDD的两种方法
- 读取外部数据集
- 分发驱动器程序对象集合(list/set)

RDD支持两种操作
- 转化操作  transformation
- 行动操作  action


**
转换操作和行动操作的区别在于Spark计算RDD的方式不同。
Spark会惰性计算这些RDD。它们只有在第一次在一个action操作中用到时才会真正的计算。
比如textFile()时并不会读取这个文档，而当调用first时，才会计算，而且到第一个满足条件即停止计算，并不会读取整个文档。
**

Spark的RDD会在每次action时重新计算，如果想复用需要使用__RDD.persist()__让Spark把这个RDD缓存起来。

__Spark程序或Shell回话都按如下方式工作__
1. 从外部数据创建出输入RDD
2. 使用诸如filter() 这样的转化操作对RDD进行转化，以定义新的RDD
3. 告诉Spark对需要被重用的中间结果RDD执行persist()操作
4. 使用action操作(例如count() first()等) 来触发一次并行计算，Spark会对计算进行优化后再执行

_Remark:cache() 和 persist() 是一样的_

---
### 2.创建RDD
创建RDD最简单的方式：SparkContext.parallelize()   将整个数据集先放在一台机器的内存中
### 3.RDD操作
一般transformation操作会返回**新的RDD**，而action操作会返回**其他的数据类型**。

**转化操作**
```
inputRDD = sc.textFile(path)
pythonRDD = inputRDD.filter(lambda x : "python" in x)
scalaRDD = inputRDD.filter(lambda x : "Scala" in x)
PSRDD = pythonRDD.union(scalaRDD)
PSRDD.collect()
```

**行动操作**    
action操作会强制执行求值用到的RDD转换操作。
take()获取RDD中少量的元素，而collect()获取整个RDD的数据，仅在小规模数据集上适用，只有在单机内存范围内才能使用collect()，不能用在大规模数据集上。
```
print("There a ",PSRDD.count()," rows data with python or scala")
print("Let's take some elements to see:\n")
for line in PSRDD.take(5):
    print(line)
```
**注意：每当调用一个新的action操作时，整个RDD都会从头开始计算。要规避这种低效行为，需要将中间结果持久化。**
**惰性求值**
当我们对RDD调用转化操作（例如调用map()）时，操作不会立即执行。    
Spark会记录所要求执行的操作的相关信息。   
RDD并不是存储着特定数据的数据集，而是通过转化操作构建出来的、记录如何计算数据的指令列表。   
我们可以利用action操作来强制Spark执行RDD转化操作，比如count()。这事一种对程序进行测试的简单方法。

### 4.向Spark传递函数
- 如果函数较短，可以传递lambda表达式
- 也可以传递定义的函数
- rdd.filter() 定义  Return a new RDD containing only the elements that satisfy a predicate.

**注意**   
Python会将函数所在的对象也序列化传出去。当传递的对象是某个对象的一个字段引用时如self.field，Spark会把整个对象发到工作节点上。   
这样会传递太多东西，也有可能由于Python不知道如何序列化传输对象导致程序失败。   
不要这样做:   
```
def getMatchesFunctionRef(self,rdd):
    return rdd.filter(self.isMaatch)
def getMatchesMemberRef(self,rdd):
    return rdd.filter(lambda x:self.query in x)
```
这样做:   
```
getMatchesNoRef(self,rdd):
    query = self.query
    rdd.filter(lambda x:query in x)
```

### 5.常见的转化操作和行动操作
__map() 和 flatMap()__  
_Spark 中 map函数会对每一条输入进行指定的操作，然后为每一条输入返回一个对象；   
而flatMap函数则是两个操作的集合——正是“先映射后扁平化”：   
操作1：同map函数一样：对每一条输入进行指定的操作，然后为每一条输入返回一个对象   
操作2：最后将所有对象合并为一个对象_   
```
nList = sc.parallelize(["Hello World","Hello Spark","Hello Python"])
mapRDD = nList.map(lambda x:x.split(" "))
flatMapRDD = nList.flatMap(lambda x:x.split(" "))
mapRDD.collect()
输出: [['Hello', 'World'], ['Hello', 'Spark'], ['Hello', 'Python']]
flatMapRDD.collect()
输出: ['Hello', 'World', 'Hello', 'Spark', 'Hello', 'Python']
```
__伪集合操作__  
RDD.distinct()转化操作生成一个只包含不同元素的新RDD,但是开销很大，因为它需要将所有数据通过网络进行shuffle。

__action操作__
```
# reduce
nums = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
words = sc.parallelize(["Hello","Niko","Bellic"])
sumNums = nums.reduce(lambda x,y : x + y)
sumWords = words.reduce(lambda x,y, : x + y)
# fold
foldNums = nums.fold(1,lambda x,y : x + y)
foldNums
# aggregate
sumCount = nums.aggregate((0,0),
               (lambda acc,value : (acc[0] + value,acc[1] + 1)),
               (lambda acc1,acc2 : (acc1[0]+acc2[0],acc1[1]+acc2[1])))
sumCount[0] / float(sumCount[1])
```
collect,count,countByValue,take,top,takeOrdered,taksSample,reduce,fold,aggregate,foreach

### 6.持久化
使用rdd.persist()来将需要频繁使用的数据持久化到内存中(默认级别)  
数据将会以序列化的形式缓存在JVM的堆空间中。    
调用rdd.unpersist()可以手动把持久化的RDD从缓存中移除.
```
from pyspark import StorageLevel
input = sc.parallelize([1,2,3,4,5])
squaredRDD = input.map(lambda x: x*x)
squaredRDD.persist(StorageLevel.DISK_ONLY)
print(squaredRDD.count())
print(squaredRDD.collect())
```



In [1]:
from pyspark import SparkConf,SparkContext
sc = SparkContext("local","RDD")

In [23]:
import os
path = "file://" + os.path.abspath(".") + "/quickstart.txt"
lines = sc.textFile(path)

This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark’s interactive shell (in Python or Scala), then show how to write applications in Java, Scala, and Python. See the programming guide for a more complete reference.
Spark’s shell provides a simple way to learn the API, as well as a powerful tool to analyze data interactively. It is available in either Scala (which runs on the Java VM and is thus a good way to use existing Java libraries) or Python. Start it by running the following in the Spark directory:
Python
Python
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
Python
Suppose we wish to write a self-contained application using the Spark API. We will walk through a simple application in Scala (with sbt), Java (with Maven), and Python.
Python
Finally, Spark includes several samples in the examples directory (Scala, Java, Python, R). You can run them as follo

### 调用转化操作filter

In [None]:
pythonLine = lines.filter(lambda line : "Python" in line)

### 调用行动操作first

In [24]:
pythonLine.first()

'This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark’s interactive shell (in Python or Scala), then show how to write applications in Java, Scala, and Python. See the programming guide for a more complete reference.'

### 把RDD持久化到内存中

In [25]:
pythonLine.persist()

PythonRDD[29] at collect at <ipython-input-23-e90d135d5a23>:5

In [29]:
pythonLine.count()  # 不会重新计算，而是调用内存中的RDD

10

In [27]:
pythonLine.first() # 不会重新计算，而是调用内存中的RDD

'This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark’s interactive shell (in Python or Scala), then show how to write applications in Java, Scala, and Python. See the programming guide for a more complete reference.'

In [33]:
# 创建RDD最简单的方式
lines = sc.parallelize(["panda","i like pandas"])
lines.collect()

['panda', 'i like pandas']

In [38]:
# 转化操作
inputRDD = sc.textFile(path)
pythonRDD = inputRDD.filter(lambda x : "python" in x)
scalaRDD = inputRDD.filter(lambda x : "Scala" in x)
PSRDD = pythonRDD.union(scalaRDD)
PSRDD.collect()

['./bin/spark-submit examples/src/main/python/pi.py',
 'This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark’s interactive shell (in Python or Scala), then show how to write applications in Java, Scala, and Python. See the programming guide for a more complete reference.',
 'Spark’s shell provides a simple way to learn the API, as well as a powerful tool to analyze data interactively. It is available in either Scala (which runs on the Java VM and is thus a good way to use existing Java libraries) or Python. Start it by running the following in the Spark directory:',
 'Scala',
 'Scala',
 'This first maps a line to an integer value, creating a new RDD. reduce is called on that RDD to find the largest line count. The arguments to map and reduce are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We’ll use Math.max() function to make

In [42]:
# 行动操作
print("There a ",PSRDD.count()," rows data with python or scala")
print("Let's take some elements to see:\n")
for line in PSRDD.take(5):
    print(line)

There a  12  rows data with python or scala
Let's take some elements to see:

./bin/spark-submit examples/src/main/python/pi.py
This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark’s interactive shell (in Python or Scala), then show how to write applications in Java, Scala, and Python. See the programming guide for a more complete reference.
Spark’s shell provides a simple way to learn the API, as well as a powerful tool to analyze data interactively. It is available in either Scala (which runs on the Java VM and is thus a good way to use existing Java libraries) or Python. Start it by running the following in the Spark directory:
Scala
Scala


### 向Spark传递函数

In [62]:
s = "Hello NikoBelic , My name is tom! Second line : Hey Nice to meet you"
s = sc.parallelize([s])
# word = s.filter(lambda x : "My" in x)
# word.first()

def containsName(s):
    return "name" in s
word = s.filter(containsName)
word.first()

'Hello NikoBelic , My name is tom! Second line : Hey Nice to meet you'

In [99]:
nList = sc.parallelize(["Hello World","Hello Spark","Hello Python"])
mapRDD = nList.map(lambda x:x.split(" "))
flatMapRDD = nList.flatMap(lambda x:x.split(" "))
mapRDD.collect()
flatMapRDD.collect()


['Hello', 'World', 'Hello', 'Spark', 'Hello', 'Python']

In [105]:
# 伪集合操作
lines1 = sc.parallelize(["coffee","panda","monkey","tea","coffee"])
lines2 = sc.parallelize(["coffee","monkey","kitty"])
# distinct
distincetedWords = lines1.distinct()
distincetedWords.collect()
# union
unionWords = lines1.union(lines2)#.distinct()
unionWords.collect()
# intersection
intersectionWords = lines1.intersection(lines2)
intersectionWords.collect()
# subtract
subtractWords = lines1.subtract(lines2)
subtractWords.collect()
# cartesian
cartesianWords = lines1.cartesian(lines2)
cartesianWords.collect()

[('coffee', 'coffee'),
 ('coffee', 'monkey'),
 ('coffee', 'kitty'),
 ('panda', 'coffee'),
 ('panda', 'monkey'),
 ('panda', 'kitty'),
 ('monkey', 'coffee'),
 ('monkey', 'monkey'),
 ('monkey', 'kitty'),
 ('tea', 'coffee'),
 ('tea', 'monkey'),
 ('tea', 'kitty'),
 ('coffee', 'coffee'),
 ('coffee', 'monkey'),
 ('coffee', 'kitty')]

In [128]:
# action操作   
# reduce
nums = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
words = sc.parallelize(["Hello","Niko","Bellic"])
sumNums = nums.reduce(lambda x,y : x + y)
sumWords = words.reduce(lambda x,y, : x + y)
# fold
foldNums = nums.fold(1,lambda x,y : x + y)
foldNums
# aggregate
sumCount = nums.aggregate((0,0),
               (lambda acc,value : (acc[0] + value,acc[1] + 1)),
               (lambda acc1,acc2 : (acc1[0]+acc2[0],acc1[1]+acc2[1])))
sumCount[0] / float(sumCount[1])


5.5

In [7]:
# 持久化
from pyspark import StorageLevel
input = sc.parallelize([1,2,3,4,5])
squaredRDD = input.map(lambda x: x*x)
squaredRDD.persist(StorageLevel.DISK_ONLY)
print(squaredRDD.count())
print(squaredRDD.collect())

5
[1, 4, 9, 16, 25]
