### Basic

In [94]:
#_*_coding:utf-8_*_
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.context import SparkConf
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext.getOrCreate(conf=conf)

### RDD读取

In [106]:
###获取路径下所以文件 形成RDD
rdd = sc.wholeTextFiles("./data/")

In [107]:
rdd.collect()

[('file:/home/admp/wangzhan/jupyter/pyspark/data/wc1.txt',
  'hello world\nhello Ireland\nhello ML\nhello Neo4j'),
 ('file:/home/admp/wangzhan/jupyter/pyspark/data/wc2.txt',
  'hello google\nhello facebook\nhello XD\nhello UCAS')]

In [109]:
rdd.take(1)

[('file:/home/admp/wangzhan/jupyter/pyspark/data/wc1.txt',
  'hello world\nhello Ireland\nhello ML\nhello Neo4j')]

In [96]:
## 文本总读取RDD
rdd = sc.textFile("wc.txt")
rdd.collect()

['hello world', 'hello spark', 'hello hive', 'hello hadoop']

In [97]:
## list创建RDD
rdd = sc.parallelize(['hello world', 'hello spark', 'hello hive', 'hello hadoop'])
rdd.collect()

['hello world', 'hello spark', 'hello hive', 'hello hadoop']

### word count example

In [98]:
## 单词分割
rdd0 = rdd.flatMap(lambda line:line.split(" "))
rdd0.collect()

['hello', 'world', 'hello', 'spark', 'hello', 'hive', 'hello', 'hadoop']

In [99]:
## map
rdd1 = rdd0.map(lambda x:(x,1))
rdd1.collect()

[('hello', 1),
 ('world', 1),
 ('hello', 1),
 ('spark', 1),
 ('hello', 1),
 ('hive', 1),
 ('hello', 1),
 ('hadoop', 1)]

In [100]:
##recude
rdd2 = rdd1.reduceByKey(lambda a,b:a+b)
rdd2.collect()

[('hello', 4), ('world', 1), ('spark', 1), ('hive', 1), ('hadoop', 1)]

In [102]:
## 查看分区数
rdd.getNumPartitions()

1

In [104]:
rdd.glom().collect()

[['hello world', 'hello spark', 'hello hive', 'hello hadoop']]

In [105]:
## 重新分区
rdd.repartition(2).glom().collect()

[['hello world', 'hello spark', 'hello hive', 'hello hadoop'], []]

In [28]:
## 数据按照分区形式打印 
## 如果是四个分区
## 可能是[['hello world'], ['hello spark'], ['hello hive'], ['hello hadoop']]
rdd.glom().collect()

[['hello world', 'hello spark', 'hello hive', 'hello hadoop']]

### map flatMap

In [30]:
## flatMap 将map方法的结果 flap
rdd = sc.parallelize([1,2,3,4])

In [32]:
rdd0 = rdd.map(lambda x:range(x))
rdd0.collect()

[range(0, 1), range(0, 2), range(0, 3), range(0, 4)]

In [34]:
rdd1 = rdd.flatMap(lambda x:range(x))
rdd1.collect()

[0, 0, 1, 0, 1, 2, 0, 1, 2, 3]

### 聚合类 reduce fold/foldLeft/foldRight aggregate

In [55]:
rdd = sc.parallelize([1,2,3,4])
def add(a,b):
    return a+b
rdd.reduce(lambda a,b:a+b) ##rdd.reduce(add)

10

In [56]:
rdd.fold(3,lambda a,b:a+b)

16

In [57]:
## 分区聚合 初始化两个分区
rdd =sc.parallelize([2,4,6,1],2)
rdd.glom().collect()

[[2, 4], [6, 1]]

In [58]:
## 每个分区聚合 用seqOp，分区之间聚合用combOp
zeroValue = 0
seqOp = lambda a,b:a+b
combOp = lambda x,y:x+y
rdd.aggregate(zeroValue,seqOp,combOp)

13

### filter distinct

In [59]:
rdd = sc.parallelize([1,2,3,4,5])
rdd1 = rdd.flatMap(lambda x:(x,x+1,x+2))
rdd1.collect()

[1, 2, 3, 2, 3, 4, 3, 4, 5, 4, 5, 6, 5, 6, 7]

In [61]:
rdd1.filter(lambda x:x%2==0).collect()

[2, 2, 4, 4, 4, 6, 6]

In [62]:
rdd1.distinct().collect()

[1, 2, 3, 4, 5, 6, 7]

### 交集intersection 并集union 排序sortBy 笛卡尔积cartesian

In [140]:
rdd0 = sc.parallelize(["B","B","A"])
rdd1 = sc.parallelize(["B","C"])
rdd0.intersection(rdd1).collect()

['B']

In [141]:
rdd0.union(rdd1).collect()

['B', 'B', 'A', 'B', 'C']

In [142]:
rdd0.sortBy(lambda x:x,ascending = True).collect()

['A', 'B', 'B']

In [143]:
rdd0.cartesian(rdd1).collect()

[('B', 'B'), ('B', 'C'), ('B', 'B'), ('B', 'C'), ('A', 'B'), ('A', 'C')]

### PairRdd的操作 groupByKey reduceByKey aggregateByKey reduceByKeyLocally

In [72]:
rdd = sc.parallelize(["zhangsan","lisi","wangwu","zhangsan","lisi","wangwu","zhangsan","lisi","wangwu","zhangsan","lisi","wangwu"])

In [73]:
rdd0 = rdd.map(lambda x:(x,1))

In [74]:
rdd0.collect()

[('zhangsan', 1),
 ('lisi', 1),
 ('wangwu', 1),
 ('zhangsan', 1),
 ('lisi', 1),
 ('wangwu', 1),
 ('zhangsan', 1),
 ('lisi', 1),
 ('wangwu', 1),
 ('zhangsan', 1),
 ('lisi', 1),
 ('wangwu', 1)]

In [87]:
rdd0.groupByKey().collect()

[('zhangsan', <pyspark.resultiterable.ResultIterable at 0x7f064c150cc0>),
 ('lisi', <pyspark.resultiterable.ResultIterable at 0x7f064c150cf8>),
 ('wangwu', <pyspark.resultiterable.ResultIterable at 0x7f064c5e39b0>)]

In [90]:
rdd0.reduceByKey(lambda a,b:a+b).collect()

[('zhangsan', 4), ('lisi', 4), ('wangwu', 4)]

In [91]:
zeroValue = 0
sqlFunc = lambda a,b:a+b
combFunc = lambda a,b:a+b
rdd3 = rdd0.aggregateByKey(zeroValue,sqlFunc,combFunc)

In [92]:
rdd3.collect()

[('zhangsan', 4), ('lisi', 4), ('wangwu', 4)]

In [93]:
## reduceByKeyLocally 返回字典
rdd0.reduceByKeyLocally(lambda a,b:a+b)

{'zhangsan': 4, 'lisi': 4, 'wangwu': 4}

### 算子分类

#### RDD Transformation 算子 
map filter distinct sample sortBy groupByKey sortByKey reduceByKey aggregateByKey sampleByKey

In [119]:
## 采样
rdd = sc.parallelize([1,2,3,2,3,4,2,2,3,4,5,3])
rdd.sample(withReplacement =True,fraction = 0.2).collect()  ## withReplacement 是否有放回采样

[1]

In [120]:
### 连写 比较长的python代码可以用（） 然后直接换行

In [121]:
rdd = sc.parallelize(["Hello world","hello hadoop","hadoop Spark"])

In [128]:
pairRdd = (rdd.
          flatMap(lambda x:x.split(" ")).
          map(lambda x:x.lower()).  ## 大小写转换
          map(lambda x:(x,1)).
          reduceByKey(lambda a,b:a+b))
pairRdd.collect()

[('hello', 2), ('world', 1), ('hadoop', 2), ('spark', 1)]

In [149]:
## sampleByKey 默认泊松抽样 可以指定伯努利
rdd = sc.parallelize(["hello world","hello world","hello world"])
(rdd.
flatMap(lambda x:x.split(" ")).
map(lambda x:x.lower()).  ## 大小写转换
map(lambda x:(x,1)).
sampleByKey(withReplacement =True,fractions = {"hello":0.2,"world":0.2})).collect()  ### fractions 传入每个key的fraction

[('world', 1)]

In [154]:
### join
# join leftOuterjoin rightOuterJoin fullOuterJoin
rdd1 = sc.parallelize([('cat', 2), ('cat', 5), ('book', 4), ('cat', 12)])
rdd2 = sc.parallelize([('cat', 2), ('cup', 5), ('mouse', 4), ('cat', 12)])

In [155]:
rdd1.join(rdd2).collect()

[('cat', (2, 2)),
 ('cat', (2, 12)),
 ('cat', (5, 2)),
 ('cat', (5, 12)),
 ('cat', (12, 2)),
 ('cat', (12, 12))]

In [156]:
help(rdd1.join)

Help on method join in module pyspark.rdd:

join(other, numPartitions=None) method of pyspark.rdd.RDD instance
    Return an RDD containing all pairs of elements with matching keys in
    C{self} and C{other}.
    
    Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
    (k, v1) is in C{self} and (k, v2) is in C{other}.
    
    Performs a hash join across the cluster.
    
    >>> x = sc.parallelize([("a", 1), ("b", 4)])
    >>> y = sc.parallelize([("a", 2), ("a", 3)])
    >>> sorted(x.join(y).collect())
    [('a', (1, 2)), ('a', (1, 3))]



#### Action 算子

In [None]:
### excutor返回driver的API
#collect() take() first()  生产不用collect()，可能导致driver崩溃

### 输出
#foreach foreachPartition

###保存文件到外部存储系统
#saveAsTextFile

### Rdd 数据输出

In [158]:
#rdd.foreachPartition 可以连接MySQL Hbase,写入数据库和数据仓库
#dstream.foreachRdd
## saveAsTextFile  保存成textFile
## saveAsHadoopDataSet(jobconf())
## TableoutputFormat
## saveAsHadoopFile

### example

In [179]:
### topN
import random
rdd = sc.parallelize([('cat', 2), ('cat', 5), ('book', 6), ('cat', 12),('book', 8),('book', 4)])

In [180]:
rdd.top(3,key = lambda x:x[1])

[('cat', 12), ('book', 8), ('book', 6)]

In [181]:
### topN 方法一 两阶段聚合（key +  随机前缀 + 局部聚合 + 全局聚合）
### 防止数据倾斜 添加随机前缀
### 局部去top2
### 局部的top2里面求全局top2

In [182]:
def topn(key,iter):
    sortedIter = sorted(iter,reverse = True)
    top2 = sortedIter[:2]
    return map(lambda x:(key,x),top2)

In [211]:
result = (rdd.mapPartitions(lambda iter:map(lambda x:((random.randint(1,10),x[0]),x[1]),iter))
         .groupByKey()
         .flatMap(lambda x:topn(x[0][1],x[1]))
         .groupByKey()
         .flatMap(lambda x:topn(x[0],x[1])))

In [223]:
rdd.collect()

[('cat', 2), ('cat', 5), ('book', 6), ('cat', 12), ('book', 8), ('book', 4)]

In [213]:
### 方式2 使用aggregatrBykey  两阶段聚合函数

In [234]:
def f(a,b):
    a.append(b)
    sortedIter = sorted(a,reverse = True)
    top2 = sortedIter[:2]
    return top2
def h(x,y):
    sortedIter = sorted(x+y,reverse = True)
    top2 = sortedIter[:2]
    return top2
rdd.aggregateByKey(zeroValue = [],seqFunc = lambda a,b:f(a,b),combFunc = lambda x,y:h(x,y)).collect()

[('cat', [12, 5]), ('book', [8, 6])]