### Pair RDD =>常用於聚合任務

In [2]:
## 允許平行地對每個鍵進行操作,或是跨領域重新分群


### 可以使用所以標準RDD可用的轉換操作

In [140]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App") #local ==> to local 執行續
sc = SparkContext(conf = conf)



In [None]:
lines = sc.parallelize(["pandas","i like pandas"])

In [6]:
### 建立一個pair RDD,Python需要回傳一個tuple
pairs = lines.map(lambda x: (x.split(" ")[0], x))

In [13]:
pairs.collect()

[('pandas', 'pandas'), ('i', 'i like pandas')]

In [14]:
type(pairs)

pyspark.rdd.PipelinedRDD

### Python 從記憶體內的集合物件建立pairRDD時,只需要對pairs集合呼叫SparkContext.parallelize()

In [197]:

num = sc.parallelize({(1,2),(3,4),(3,6)})

In [60]:
a = num.reduceByKey(lambda x,y:x+y).collect()
a

[(1, 2), (3, 10)]

In [63]:
b = num.groupByKey().collect()
b

[(1, <pyspark.resultiterable.ResultIterable at 0x7f09e84271d0>),
 (3, <pyspark.resultiterable.ResultIterable at 0x7f09e8427b10>)]

In [65]:
c = num.mapValues(lambda x:x+1).collect()
c

[(1, 3), (3, 5), (3, 7)]

In [66]:
d = num.keys().collect()
d

[1, 3, 3]

In [68]:
e = num.values().collect()
e

[2, 4, 6]

In [69]:
f = num.sortByKey().collect()
f

[(1, 2), (3, 4), (3, 6)]

In [71]:
# flatMapValues
x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])
y = x.flatMapValues(lambda x: [i**2 for i in x]) # function is applied to entire value, then result is flattened
print(x.collect())
print(y.collect())


[('A', (1, 2, 3)), ('B', (4, 5))]
[('A', 1), ('A', 4), ('A', 9), ('B', 16), ('B', 25)]


In [79]:
g = num.flatMapValues(lambda x: str(x+1)).collect()
g

[(1, '3'), (3, '5'), (3, '7')]

In [80]:
other = sc.parallelize({(3,9)})

In [83]:
h = num.subtractByKey(other).collect()
h

[(1, 2)]

In [84]:
#two RDD to do the Inner Join
i = num.join(other).collect()
i

[(3, (4, 9)), (3, (6, 9))]

In [85]:
#Two RDD right outer join
j = num.rightOuterJoin(other).collect()
j

[(3, (4, 9)), (3, (6, 9))]

In [86]:
#Two RDD left outer join
k = num.leftOuterJoin(other).collect()
k

[(1, (2, None)), (3, (4, 9)), (3, (6, 9))]

In [87]:
# Base on two RDD to let them diveid two group
l = num.cogroup(other).collect()
l

[(1,
  (<pyspark.resultiterable.ResultIterable at 0x7f09e8432ad0>,
   <pyspark.resultiterable.ResultIterable at 0x7f09e84ce090>)),
 (3,
  (<pyspark.resultiterable.ResultIterable at 0x7f09e84ce390>,
   <pyspark.resultiterable.ResultIterable at 0x7f09e84ce050>))]

In [106]:
inputRDD = sc.textFile("/home/jordan/Desktop/input.txt")

In [112]:
result = inputRDD.filter(lambda keyValue: len(keyValue[1]) < 5)

In [113]:
result.collect()

[u'INFO This is a message with content',
 u'INFO This is a some other content',
 u'(empty line)',
 u'INFO Here are more messages',
 u'(empty line)',
 u'ERROR Something bad happened',
 u'WARN More details on the bad thing',
 u'INFO back to normal messages']

### 聚合 to return a (key,key's cal of reduce)

In [119]:
aggre = sc.parallelize({("panada",0),("pink",3),("pirate",3),("panada",1),("pink",4)})

In [120]:
aggre.mapValues(lambda x: (x, 1)).collect()

[('panada', (1, 1)),
 ('pink', (3, 1)),
 ('pink', (4, 1)),
 ('panada', (0, 1)),
 ('pirate', (3, 1))]

In [121]:
aggre.mapValues(lambda x: (x, 1)).reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1])).collect()

[('pink', (7, 2)), ('panada', (1, 2)), ('pirate', (3, 1))]

In [144]:
### count the word 
rdd = sc.textFile("/home/jordan/Desktop/input.txt")
rdd

/home/jordan/Desktop/input.txt MapPartitionsRDD[4] at textFile at NativeMethodAccessorImpl.java:0

In [147]:
words = rdd.flatMap(lambda x: x.split(" "))
words

PythonRDD[6] at RDD at PythonRDD.scala:48

In [149]:
words.map(lambda x: (x,1)).collect()

[(u'INFO', 1),
 (u'This', 1),
 (u'is', 1),
 (u'a', 1),
 (u'message', 1),
 (u'with', 1),
 (u'content', 1),
 (u'INFO', 1),
 (u'This', 1),
 (u'is', 1),
 (u'a', 1),
 (u'some', 1),
 (u'other', 1),
 (u'content', 1),
 (u'(empty', 1),
 (u'line)', 1),
 (u'INFO', 1),
 (u'Here', 1),
 (u'are', 1),
 (u'more', 1),
 (u'messages', 1),
 (u'WARN', 1),
 (u'This', 1),
 (u'is', 1),
 (u'a', 1),
 (u'(empty', 1),
 (u'line)', 1),
 (u'ERROR', 1),
 (u'Something', 1),
 (u'bad', 1),
 (u'happened', 1),
 (u'WARN', 1),
 (u'More', 1),
 (u'details', 1),
 (u'on', 1),
 (u'the', 1),
 (u'bad', 1),
 (u'thing', 1),
 (u'INFO', 1),
 (u'back', 1),
 (u'to', 1),
 (u'normal', 1),
 (u'messages', 1)]

In [151]:
words.map(lambda x: (x,1)).reduceByKey(lambda x, y:x +y).collect()

[(u'is', 3),
 (u'some', 1),
 (u'back', 1),
 (u'are', 1),
 (u'message', 1),
 (u'More', 1),
 (u'Here', 1),
 (u'Something', 1),
 (u'content', 2),
 (u'to', 1),
 (u'other', 1),
 (u'details', 1),
 (u'line)', 2),
 (u'more', 1),
 (u'INFO', 4),
 (u'normal', 1),
 (u'This', 3),
 (u'WARN', 2),
 (u'ERROR', 1),
 (u'with', 1),
 (u'a', 3),
 (u'on', 1),
 (u'messages', 2),
 (u'thing', 1),
 (u'bad', 2),
 (u'(empty', 2),
 (u'the', 1),
 (u'happened', 1)]

In [143]:
sc.parallelize([3,4,5]).collect()

[3, 4, 5]

## 累加器

## combineByKey()
### new element => createCombiner()
### already element =>mergeValue()

### 如果兩個以上的分割有同一個鍵值的累加器,會使用mergeCombiners()

In [180]:
test = sc.parallelize({("coffee",1),("coffee",2),("panada",3)})

In [181]:
sumCount = test.combineByKey((lambda x: (x,1)),
                  (lambda x, y: (x[0] + y, x[1] + 1)),
                  (lambda x, y: (x[0] + y[0], x[1] + y[1])))
sumCount

PythonRDD[54] at RDD at PythonRDD.scala:48

In [182]:
sumCount.map(lambda key, xy: (key, xy[0]/xy[1]))

PythonRDD[55] at RDD at PythonRDD.scala:48

In [183]:
r = sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 15, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/jordan/Desktop/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/home/jordan/Desktop/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/jordan/Desktop/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
TypeError: <lambda>() takes exactly 2 arguments (1 given)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/jordan/Desktop/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/home/jordan/Desktop/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/jordan/Desktop/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
TypeError: <lambda>() takes exactly 2 arguments (1 given)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


### 調校平行度

### 每個RDD都會有固定的分割(partitions), 這會決定平行化的程度

In [184]:
data = [("a", 3), ("b", 4), ("a", 1)]

In [185]:
sc.parallelize(data).reduceByKey(lambda x, y: x +y) #預設平行化的程度

PythonRDD[62] at RDD at PythonRDD.scala:48

In [186]:
sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10) #指定 平行度

PythonRDD[68] at RDD at PythonRDD.scala:48

### 資料分組

In [187]:
#groupByKey() => [K, Iterable[V]]
#cogroup() => [k, (Iterable[v], Iterable[W])] if value don't have it will show the null

### leftOuterJoin() and rightOuterJoin() 每個鍵必須有一個來源的RDD, 以及一個來源RDD的Option值 

### 對排序資料呼叫collect() 或是save()時 sorteByKey(),結果都是經過排序的

In [190]:
rdd.sortByKey(ascending=True, numPartitions=None, keyfunc = lambda x: str(x)).collect()

[u'(empty line)',
 u'(empty line)',
 u'ERROR Something bad happened',
 u'INFO This is a message with content',
 u'INFO This is a some other content',
 u'INFO Here are more messages',
 u'INFO back to normal messages',
 u'WARN More details on the bad thing']

In [191]:
rdd.collect()

[u'INFO This is a message with content',
 u'INFO This is a some other content',
 u'(empty line)',
 u'INFO Here are more messages',
 u'(empty line)',
 u'ERROR Something bad happened',
 u'WARN More details on the bad thing',
 u'INFO back to normal messages']

In [199]:
num.countByKey()

defaultdict(int, {1: 1, 3: 2})

In [202]:
type(num.collectAsMap())

dict

In [201]:
num.collect()

[(1, 2), (3, 4), (3, 6)]

In [204]:
num.lookup(3) #lookup(key)

[4, 6]

### 讓輸出資料達到最小的網路傳輸量可以大幅地增進效能


In [None]:
### Spark 內部機制會知道每個操作對於分割的影響,並且自動地對分割資料操作所產生的RDD設定Partioner
### 因為轉換操作不能保證產生分割的操作,所以回傳的RDD將不會設定partitioner
### 