### 导入pyspark并初始化

In [1]:
#初始化
from pyspark import SparkConf,SparkContext
sc = SparkContext()

In [2]:
#创建RDD
intRDD = sc.parallelize([3,1,2,5,5])
stringRDD = sc.parallelize(['Apple','Orange','Grape','Banana','Apple'])

In [3]:
print (intRDD.collect())
print (stringRDD.collect())

[3, 1, 2, 5, 5]
['Apple', 'Orange', 'Grape', 'Banana', 'Apple']


### map运算

In [5]:
print(intRDD.map(lambda x:x+1).collect())

[4, 2, 3, 6, 6]


### filter运算
filter运算对RDD内每一个元素进行筛选，并产生另一个RDD

In [9]:
print (intRDD.filter(lambda x: x<3).collect())
print (stringRDD.filter(lambda x:'ra' in x).collect())

[1, 2]
['Orange', 'Grape']


### distinct运算
删除重复的元素

In [12]:
print(intRDD.distinct().collect())

[1, 5, 2, 3]


### random split
按照比例将随机数进行切分

In [13]:
sRDD = intRDD.randomSplit([0.4,0.6])
print(len(sRDD))
print(sRDD[0].collect(),sRDD[1].collect())

2
[3] [1, 2, 5, 5]


### groupBy  agg
聚合函数

In [14]:
result = intRDD.groupBy(lambda x : x % 2).collect()
print (sorted([(x, sorted(y)) for (x, y) in result]))

[(0, [2]), (1, [1, 3, 5, 5])]


### 多个RDD运算

In [15]:
intRDD1 = sc.parallelize([3,1,2,5,5])
intRDD2 = sc.parallelize([5,6])
intRDD3 = sc.parallelize([2,7])

In [19]:
#并集运算
print (intRDD1.union(intRDD2).union(intRDD3).collect())

[3, 1, 2, 5, 5, 5, 6, 2, 7]


In [20]:
#交集预算
print (intRDD1.intersection(intRDD2).collect())

[5]


In [21]:
#差集运算
print(intRDD1.subtract(intRDD2).collect())

[1, 2, 3]


In [22]:
#笛卡尔积运算
print (intRDD1.cartesian(intRDD2).collect())

[(3, 5), (3, 6), (1, 5), (1, 6), (2, 5), (2, 6), (5, 5), (5, 5), (5, 6), (5, 6)]


### 基本动作运算

#### 
读取元素，Action运算所以会马上执行

In [25]:
#读取第一条数据
print(intRDD.first())
#取前2条数据
print(intRDD.take(2))
#取前三条，并升序排列
print(intRDD.takeOrdered(3))
#取前三条并降序排列
print(intRDD.takeOrdered(3,lambda x:-x))

3
[3, 1]
[1, 2, 3]
[5, 5, 3]


In [26]:
#统计功能
#统计
print (intRDD.stats())
#最小值
print (intRDD.min())
#最大值
print (intRDD.max())
#标准差
print (intRDD.stdev())
#计数
print (intRDD.count())
#求和
print (intRDD.sum())
#平均
print (intRDD.mean())

(count: 5, mean: 3.2, stdev: 1.6, max: 5.0, min: 1.0)
1
5
1.6
5
16
3.2


### RDD Key-Val 基本转换操作  

In [27]:
#初始化
kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])

In [30]:
#得到key，和value
print(kvRDD1.keys().collect())
#print(kvEDD1.keys().collect())

[3, 3, 5, 1]


In [32]:
#筛选元素
print (kvRDD1.filter(lambda x:x[0] < 5).collect())
print (kvRDD1.filter(lambda x:x[1] < 5).collect())

[(3, 4), (3, 6), (1, 2)]
[(3, 4), (1, 2)]


In [33]:
#值运算mapValues
print (kvRDD1.mapValues(lambda x:x**2).collect())

[(3, 16), (3, 36), (5, 36), (1, 4)]


In [34]:
#按key使用sortBYKey进行排序  默认升序
print (kvRDD1.sortByKey().collect())
print (kvRDD1.sortByKey(True).collect())
print (kvRDD1.sortByKey(False).collect())

[(1, 2), (3, 4), (3, 6), (5, 6)]
[(1, 2), (3, 4), (3, 6), (5, 6)]
[(5, 6), (3, 4), (3, 6), (1, 2)]


In [35]:
#合并key值的数据
#使用reduceByKey可以对相同key的值进行合并
print (kvRDD1.reduceByKey(lambda x,y:x+y).collect())

[(5, 6), (1, 2), (3, 10)]


### 多个RDD key-value转换运算·

In [36]:
kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
kvRDD2 = sc.parallelize([(3,8)])

In [37]:
#内连接运算
#将两个RDD按照相同的key值join起来
print (kvRDD1.join(kvRDD2).collect())

[(3, (4, 8)), (3, (6, 8))]


In [None]:
#左外连接左边元素全显示，右边没有的显示NOne
print (kvRDD1.leftOuterJoin(kvRDD2).collect())

In [40]:
#右外连接，右边元素全显示，左边元素没有显示None
print(kvRDD1.rightOuterJoin(kvRDD2).collect())

[(3, (4, 8)), (3, (6, 8))]


In [41]:
#删除相同key值的数据
#使用函数subtractByKey
print (kvRDD1.subtractByKey(kvRDD2).collect())

[(1, 2), (5, 6)]


### key-val动作操作

In [45]:
#按key值统计
#使用countByKey可以统计各个key的元素个数
print (kvRDD1.countByKey())

defaultdict(<class 'int'>, {3: 2, 5: 1, 1: 1})


In [46]:
#lookup查找函数
print (kvRDD1.lookup(3))

[4, 6]


In [None]:
i.