In [1]:
import findspark

findspark.init()

In [2]:
from pyspark import SparkConf,SparkContext

# 创建RDD

In [3]:
#创建SparkContext对象
conf=SparkConf().setMaster("local").setAppName("my app")
sc=SparkContext(conf=conf)

In [4]:
#法一：将一个已有的集合传给SparkContext.paralleize() 不常用，因为要先将数据都读到内存
lines=sc.parallelize(["pandas","i like pandas"])
type(lines)

pyspark.rdd.RDD

In [5]:
#法二：从外部数据中读取（惰性）
lines=sc.textFile("README.md")
type(lines)

pyspark.rdd.RDD

# RDD操作——转化操作 （由一个RDD生成一个新的RDD）
# 注意：spark在转化操作中并不会计算，遇到第一个行动操作时才计算

### 对元素的操作

In [6]:
pythonLines=lines.filter(lambda line:"Python" in line)
for line in pythonLines.take(pythonLines.count()):
    print(line)

high-level APIs in Scala, Java, Python, and R, and an optimized engine that
## Interactive Python Shell
Alternatively, if you prefer Python, you can use the Python shell:


In [7]:
#map 对每个元素做操作
lines=sc.parallelize(["hello world","hi hawaii"])
words=lines.map(lambda line:line.split(" "))
for word in words.take(words.count()):
    print (word)

['hello', 'world']
['hi', 'hawaii']


In [8]:
#flatMap 对每个输入元素生成多个输出元素
lines=sc.parallelize(["hello world","hi hawaii"])
words=lines.flatMap(lambda line:line.split(" "))
for word in words.take(words.count()):
    print (word)

hello
world
hi
hawaii


### 伪集合操作

In [9]:
rdd1=sc.parallelize([1,2,3,3])
rdd2=sc.parallelize([1,2,2,4])

In [10]:
#union 连接(不会去重)
rdd=rdd1.union(rdd2)
for i in rdd.take(rdd.count()):
    print (i)

1
2
3
3
1
2
2
4


In [11]:
#distinct 去重
rdd3=rdd1.distinct()
for i in rdd3.take(rdd3.count()):
    print (i)

1
2
3


In [12]:
#intersection 求公共元素
rdd3=rdd1.intersection(rdd2)
for i in rdd3.take(rdd3.count()):
    print (i)

2
1


In [13]:
#subtract 减
rdd3=rdd1.subtract(rdd2)
for i in rdd3.take(rdd3.count()):
    print (i)

3
3


In [14]:
#cartesian 求笛卡尔积
rdd3=rdd1.cartesian(rdd2)
for i in rdd3.take(rdd3.count()):
    print (i)

(1, 1)
(1, 2)
(1, 2)
(1, 4)
(2, 1)
(2, 2)
(2, 2)
(2, 4)
(3, 1)
(3, 2)
(3, 2)
(3, 4)
(3, 1)
(3, 2)
(3, 2)
(3, 4)


In [None]:
#sample 取样
rdd3=rdd1.sample(False,0.5)
rdd3.count()

# RDD操作——行动操作（会对RDD计算出一个结果）
# 结果(是list类型)返回到驱动器程序或写入外部系统

### 注意，每次行动操作spark都会从头计算，可以用persist将中间结果长久化到内存，以便重复使用

In [18]:
rdd=sc.parallelize([1,2,3,3])

In [19]:
#collect 将数据返回驱动器程序（要求所有数据必须能放入同一台单机内存中）
print (type(rdd))
print (type(rdd.collect()))

<class 'pyspark.rdd.RDD'>
<type 'list'>


In [20]:
#take（n）返回rdd中n个元素，但是并不保证顺序
l1=rdd.take(2)
print (l1)

[1, 2]


In [21]:
#top 取前几个数据
l1=rdd.top(3)
print (l1)

[3, 3, 2]


In [22]:
#takeSample 采样 
l1=rdd.takeSample(False,3)
print (l1)

[3, 1, 2]


In [23]:
#foreach 对每个元素进行特定操作 但不会返回任何数据到驱动器程序   ？？？？
def g(x):
    print (x)
rdd.foreach(g)

In [24]:
#count 返回个数
print (rdd.count())

4


In [25]:
#countByValue 按值统计
print (rdd.countByValue())

defaultdict(<type 'int'>, {1: 1, 2: 1, 3: 2})


In [39]:
#reduce 并行整和rdd中所有数据
print (rdd.reduce(lambda x,y:(x+y))) #x是指返回值，y是对rdd元素的遍历
print (rdd.reduce(lambda x,y:(x*y)))

9
18


In [36]:
#fold 跟reduce一样，但要提供初始值
def add(x,y):
    return x+y
print (rdd.fold(10,add)) #1+2+10=13   3+3+10=16   13+16=29
print (rdd.fold(0,add))


29
9


In [42]:
#aggregate 
sumCount=rdd.aggregate((0,0),
                      (lambda x,y:(x[0]+y,x[1]+1)),#单个累加器的 累加和计数
                      (lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1]))#因为是分布式计算的，将多个累加器合并起来
                      )
print(sumCount[0]/float(sumCount[1]))

2.25


# RDD的持久化

In [43]:
rdd.persist()

ParallelCollectionRDD[48] at parallelize at PythonRDD.scala:480