In [6]:
from pyspark import SparkContext
from numpy import array

In [7]:
sc = SparkContext()

#### 生成广播变量rdd.broadcast()
通过broadcast()方法可以在Spark各个分区都生成一个新的rdd，在各个分区都可以通过rdd.value调用他，因而避免了变量传输过程中的损耗

In [18]:
broadcast_var= sc.broadcast(array([1,2,3,4]))
print(broadcast_var.value)

[1 2 3 4]


#### rdd.parallelize()
从list或者turple生成新的rdd，两个参数为(变量,分区数)

In [19]:
input_data = sc.parallelize([['a',1],['b',2],['b',3],['c',4],['a',5],['b',6]], 2)

In [20]:
input_data.glom().collect()

[[['a', 1], ['b', 2], ['b', 3]], [['c', 4], ['a', 5], ['b', 6]]]

#### rdd.map()
对RDD中每个元素执行依次转换操作，传入一个函数类型的变量

In [23]:
input_key_value = input_data.map(lambda x:(x[0],x[1]))

In [24]:
input_key_value.collect()

[('a', 1), ('b', 2), ('b', 3), ('c', 4), ('a', 5), ('b', 6)]

#### rdd.filter(f)
过滤函数，选出符合f函数判断的元素，组成新的RDD

In [3]:
from operator import add

In [8]:
words = sc.parallelize(['zhangxin','yanglele','zhangyuanyang','zhangxiaofeng'])
words.collect()

['zhangxin', 'yanglele', 'zhangyuanyang', 'zhangxiaofeng']

In [11]:
count=words.count()
print('the RDD counts is->%i' %count)

In [28]:
words_new=words.filter(lambda x:'zhang' in x)
words_new.collect()

In [26]:
words_map=words.map(lambda x:x+'good')
words_map.collect()

In [10]:
adding=words.reduce(add)
adding

'zhangxinyanglelezhangyuanyangzhangxiaofeng'

#### rdd.reduceByKey(f)
针对不同的key，对其值进行f函数的操作，f函数传入两个参数，分别为前一个值或者计算后的结果和后一个值，直到同一个key对应的值全部被计算

In [25]:
sum_value = input_key_value.reduceByKey(lambda x,y:x+y)

In [26]:
sum_value.glom().collect()

[[('b', 11), ('c', 4)], [('a', 6)]]

#### mapPartitions(f, preservesPartitioning=False)
与map不同，map是对每一个元素用函数作用；而mapPartitions是
对每一个分区用一个函数去作用

In [6]:
x = sc.parallelize([1,2,3],2)

In [7]:
def f(iterator):yield sum(iterator)
xMP = x.mapPartitions(f)

In [9]:
x.glom().collect()

[[1], [2, 3]]

In [10]:
xMP.glom().collect()

[[1], [5]]

#### mapPartitionsWithIndex
与mapPartition相比，mapPartitionWithIndex能够保留分区索引，
函数的传入参数也是分区索引和iterator构成的键值对。

In [11]:
x=sc.parallelize([1,2,3],2)
def f1(partitionIndex,iterator):yield (partitionIndex,sum(iterator))
def f2(partitionIndex,iterator):yield sum(iterator)
xMP1=x.mapPartitionsWithIndex(f1)
xMP2=x.mapPartitionsWithIndex(f2)
print(x.glom().collect())
print(xMP1.glom().collect())
print(xMP2.glom().collect())

[[1], [2, 3]]
[[(0, 1)], [(1, 5)]]
[[1], [5]]


#### rdd.zip(other)
将第一个rdd的元素作用键，第二个rdd的元素作为值，组成新rdd的元素。

In [12]:
x=sc.parallelize(['B','A','A'])
y=x.map(lambda x:ord(x))
z=x.zip(y)
print(x.collect())
print(y.collect())
print(z.collect())

['B', 'A', 'A']
[66, 65, 65]
[('B', 66), ('A', 65), ('A', 65)]


#### rdd.keyBy(f)
为rdd中的每个元素按照函数f生成一个键，新rdd的元素以元组形式存在。

In [13]:
x=sc.parallelize([1,2,3])
y=x.keyBy(lambda x:x**2)
print(x.collect())
print(y.collect())

[1, 2, 3]
[(1, 1), (4, 2), (9, 3)]


#### rdd.foreach(f)
对RDD中的每个元素使用函数来作用,由于是直接对每个元素操作并产生结果，
所以得到的结果不是rdd,而是普通python对象。这与foreachPartition不同。

In [17]:
rdd_data=sc.parallelize([1,2,3,4,5],2)
print(rdd_data.glom().collect())

[[1, 2], [3, 4, 5]]


In [20]:
rdd_data.collect()

[1, 2, 3, 4, 5]

In [18]:
def f(x):
    print(x)
list_new=rdd_data.foreach(f)

In [19]:
f(1)

1


In [21]:
inputData=sc.parallelize([1,2,3])
def f(x):#定义一个将内容追加于文件末尾的函数
    with open('./example.txt','a+') as fl:
        print(x,file=fl)

open('./example.txt','w').close()#操作之前先关闭之前可能存在的对该文件的写操作
y=inputData.foreach(f)
print(y)

with open('./example.txt') as fl:
    print(fl.read())

None
3
2
1



#### rdd.groupByKey(numPartitions=None, partitionFunc=)
原rdd为键值对，groupByKey()则将原rdd的元素相同键的值编进一个sequence
（不知道与list和iterator的不同有多大，可以暂时当成iterator看）

In [32]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rddGp=rdd.groupByKey()
print(rdd.collect())
print(rddGp.collect())
#从结果看，确实是将键相同的值编到一个序列里了，但类型很奇怪。这样看没有什么用处。
#但是后面可以接其他函数，一般都接mapValues(f),这样就可以完成按对值的一些操作。

[('a', 1), ('b', 1), ('a', 1)]
[('b', <pyspark.resultiterable.ResultIterable object at 0x00000287F6768470>), ('a', <pyspark.resultiterable.ResultIterable object at 0x00000287F67684A8>)]


In [33]:
def f(x):
    a=list(x)#直接使用x会报错，说明sequence并不能用for
    for i in range(len(a)):
        a[i]=a[i]*2
    return a

gpMp1=rddGp.mapValues(len)
gpMp2=rddGp.mapValues(list)
gpMp3=rddGp.mapValues(f)
print(gpMp1.collect())
print(gpMp2.collect())
print(gpMp3.collect())

[('b', 1), ('a', 2)]
[('b', [1]), ('a', [1, 1])]
[('b', [2]), ('a', [2, 2])]


#### rdd.groupBy(f, numPartitions=None, partitionFunc=)
groupBy()的用法与groupByKey相似，但传入参数多了f,传入的函数f可以把它当成用来生成新的key的。
它也围绕这个潜在的key将值编进一个序列。可以看得出来，它比groupByKey更灵活。

In [34]:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result=rdd.groupBy(lambda x:x%2)#按余数来分组
#后面紧接着一般是mapValues函数做进一步处理，这里我们直接获取该数据而不做进一步处理。
print(result.collect())
#显示保存在<pyspark.resultiterable.ResultIterable object at ...>中元素
resultGp=[(x,sorted(y)) for (x,y) in result.collect()]
print(resultGp)

[(0, <pyspark.resultiterable.ResultIterable object at 0x00000287F675BEB8>), (1, <pyspark.resultiterable.ResultIterable object at 0x00000287F675BEF0>)]
[(0, [2, 8]), (1, [1, 1, 3, 5])]


#### rdd.reduce(f)
reduce函数是将rdd中的每个元素两两之间按函数f进行操作，然后再结果再两两之间按f进行操作，一直进行下去，即所谓的shuffle过程。
reduce得到的结果是普通的python对象，而不是rdd.

In [35]:
ex_re = sc.parallelize([i for i in range(1,5)])

In [37]:
ex_re.reduce(lambda x,y:x+y)

10

#### rdd.keys()
原rdd的元素为键值对，返回原rdd元素的键为元素的rdd

#### rdd.values()
原rdd的元素为键值对，返回原rdd元素的值为元素的rdd

In [42]:
ex_k_v = sc.parallelize([(i,i**3) for i in range(1,5)])

In [43]:
ex_k = ex_k_v.keys()
ex_v = ex_k_v.values()

In [45]:
print(ex_k.collect())
print(ex_v.collect())

[1, 2, 3, 4]
[1, 8, 27, 64]


#### fold(zeroValue, op)[source]
与partitionBy很像，只不过有一个起始值。fold函数是按分区对每个元素进行操作，即先每个元素与起始值按op进行操作，得到的结果再两两之间按op操作，
一直进行下去得到分区结果，然后再将分区结果按op操作。

In [47]:
x=sc.parallelize([1,2,3],2)
y=x.fold(1,lambda x,y:x+y)
print(x.collect())
#结果为：[1, 2, 3]
print(y)

[1, 2, 3]
9


#### RDD.aggregate(zeroValue, seqOp, combOp)
aggregate与fold相似又很不同。seqOp操作会聚合各分区中的元素，然后combOp操作把所有分区的聚合结果再次聚合，两个操作的初始值都是zeroValue. 
seqOp的操作是遍历分区中的所有元素(T)，zeroValue跟第一个T做操作，结果再作为与第二个T做操作的zeroValue，直到遍历完整个分区。combOp操作是把各分区聚合的结果，
再聚合,zeroValue与第一个分区结果聚合，聚合结果相当于新的zeroValue，再与第二个分区结果聚合，一直进行下去。
aggregate函数返回一个跟RDD不同类型的值。因此，需要一个操作seqOp来把分区中的元素T合并成一个U，另外一个操作combOp把所有U聚合。

In [48]:
seqOp=(lambda x,y:(x[0]+y,x[1]+1))
combOp=(lambda x,y:(x[0]+y[0],x[1]+y[1]))
x=sc.parallelize([1,2,3,4,5,6],2)
print(x.glom().collect())
#结果为：[[1, 2, 3], [4, 5, 6]]
y=x.aggregate((1,2),seqOp,combOp)
print(y)
#结果为：(24, 12)

#计算过程如下：
#（1，2）--》（1+1，2+1）-->(2+2,3+1)-->(4+3，4+1)-->(7,5)；
#（1，2）--》（1+4，2+1）-->(5+5,3+1)-->(10+6，4+1)-->(16,5)；
#（1，2）--》（1+7，2+5）-->(8+16,7+5)-->(24，12)；

[[1, 2, 3], [4, 5, 6]]
(24, 12)


#### RDD.aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None, partitionFunc=)
aggregate是按分区进行，而aggregateByKey是按键来进行，但是zeroValue与aggregate中的用法很不一样，这里的zeroValue是一个值，它即可以跟这样键聚合，
也可以跟那个键聚合，而且zeroValue必须与键内聚合时定义的形式一致。

In [50]:
x=sc.parallelize([('B', 1), ('B', 2), ('B', 3), ('A', 4), ('A', 5),('A', 6)])
zeroValue=[7]
mergeVal=(lambda aggregated,el:aggregated+[(el,el**2)])
mergeComb=(lambda agg1,agg2:agg1+agg2)
y=x.aggregateByKey(zeroValue,mergeVal,mergeComb)
print(x.collect())
#结果为：[('B', 1), ('B', 2), ('B', 3), ('A', 4), ('A', 5), ('A', 6)]
print(y.collect())
#结果为：[('A', [7, (4, 16), (5, 25), (6, 36)]), ('B', [7, (1, 1), (2, 4), (3, 9)])]
#计算过程如下：
#('B', [7]);('B', (1,1**2))-->('B', [7,(1,1)])-->('B', [7,(1,1)])；('B', (2,2**2))-->('B', [7,(1,1),(2,4)])...-->
#[('B', [7, (1, 1), (2, 4), (3, 9)])]
#同时'A'也进行这样的过程
#[('B', [7, (1, 1), (2, 4), (3, 9)])];[('A', [7, (4, 16), (5, 25), (6, 36)])]-->
#[('A', [7, (4, 16), (5, 25), (6, 36)]), ('B', [7, (1, 1), (2, 4), (3, 9)])]

[('B', 1), ('B', 2), ('B', 3), ('A', 4), ('A', 5), ('A', 6)]
[('B', [7, (1, 1), 7, (2, 4), (3, 9)]), ('A', [7, (4, 16), 7, (5, 25), (6, 36)])]


13

In [51]:
zx=sc.parallelize([(x,x*2) for x in range(1,5)])

In [55]:
zxx = sc.parallelize([x for x in range(1,5)])

In [56]:
zxx.stats()

(count: 4, mean: 2.5, stdev: 1.118033988749895, max: 4.0, min: 1.0)

In [58]:
zxx.top(2)

[4, 3]

In [57]:
zxx.take(3)

[1, 2, 3]

In [53]:
zx_x = zx.reduce(add)

(1, 2, 2, 4, 3, 6, 4, 8)

#### rdd.countByValue()
统计rdd所有元素中各元素的个数

In [59]:
inputData=sc.parallelize([1,3,1,2,3])
inputCountBV=inputData.countByValue()
print(inputCountBV)
#结果为：defaultdict(<class 'int'>, {1: 2, 2: 1, 3: 2})

defaultdict(<class 'int'>, {1: 2, 3: 2, 2: 1})


#### pipe(command, env=None, checkCode=False)
通过管道向后面环节输出command处理过的结果，具体功能就体现在command，command为linux命令。

In [67]:
sda=sc.parallelize(['1', '2', '2', '3']).pipe('cat')

In [70]:
#再比如正则匹配来筛选
x=sc.parallelize(['A','Ba','C','DA'])
y=x.pipe('grep -i "A"')
print(x.collect())
print(y.collect())
#结果为：['A', 'Ba', 'C', 'DA']
#['A', 'Ba', 'DA']

['A', 'Ba', 'C', 'DA']
