# Chapter 4: Working with Key/Value Pairs

###### Creating Pair RDDs

In [3]:
val lines = sc.textFile("/home/sparkuser/spark-1.6.2-bin-hadoop2.6/README.md")
lines.
  filter(x => x.split(" ")(0).length > 5).
  collect().
  foreach(println)

high-level APIs in Scala, Java, Python, and R, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
<http://spark.apache.org/>
guide, on the [project web page](http://spark.apache.org/documentation.html)
["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).
Alternatively, if you prefer Python, you can use the Python shell:
examples to a cluster. This can be a mesos:// or spark:// URL,
"yarn" to run on YARN, and "local" to run
locally with one thread, or "local[N]" to run locally with N threads. You
package. For instance:
Testing first requires [building Spark](#building-spark). Once Spark is built, tests
Please see the guidance on how to
storage systems. Because the protocols have changed in different versions of
Hadoop, you must build Spark against the same version that your cluster runs.
Please refer to the build documentation at
["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-spar

Example 1.

In [338]:
val pairs_detail = lines.
  filter(x => x.split(" ")(0).length > 5).
  map(x => (x.split(" ")(0), x.split(" ")))
  
pairs_detail.
  take(10).
  foreach(println)

(high-level,[Ljava.lang.String;@7d91d77)
(supports,[Ljava.lang.String;@6cb14793)
(<http://spark.apache.org/>,[Ljava.lang.String;@1cc05d4c)
(guide,,[Ljava.lang.String;@10e275d9)
(["Building,[Ljava.lang.String;@5f1fe017)
(Alternatively,,[Ljava.lang.String;@39f4a9b8)
(examples,[Ljava.lang.String;@27c0a9f7)
("yarn",[Ljava.lang.String;@248f4c6b)
(locally,[Ljava.lang.String;@4b01fb5)
(package.,[Ljava.lang.String;@1815dff2)


In [319]:
pairs_detail.
  collect().
  foreach{
    case (k, v) => {
        print(k + ": ");
        v.foreach(vv => print(vv + ", "))
      }
    ; println()
  }

high-level: high-level, APIs, in, Scala,, Java,, Python,, and, R,, and, an, optimized, engine, that, 
supports: supports, general, computation, graphs, for, data, analysis., It, also, supports, a, 
<http://spark.apache.org/>: <http://spark.apache.org/>, 
guide,: guide,, on, the, [project, web, page](http://spark.apache.org/documentation.html), 
["Building: ["Building, Spark"](http://spark.apache.org/docs/latest/building-spark.html)., 
Alternatively,: Alternatively,, if, you, prefer, Python,, you, can, use, the, Python, shell:, 
examples: examples, to, a, cluster., This, can, be, a, mesos://, or, spark://, URL,, 
"yarn": "yarn", to, run, on, YARN,, and, "local", to, run, 
locally: locally, with, one, thread,, or, "local[N]", to, run, locally, with, N, threads., You, 
package.: package., For, instance:, 
Testing: Testing, first, requires, [building, Spark](#building-spark)., Once, Spark, is, built,, tests, 
Please: Please, see, the, guidance, on, how, to, 
storage: storage, systems., Bec

Example 2.

In [310]:
val pairs_simple = lines.
  filter(x => x.split(" ")(0).length > 5).
  map(x => (x.split(" ")(0), x.split(" ").length))
  
pairs_simple.
  collect().
  foreach(println)

(high-level,13)
(supports,11)
(<http://spark.apache.org/>,1)
(guide,,6)
(["Building,2)
(Alternatively,,11)
(examples,12)
("yarn",9)
(locally,13)
(package.,3)
(Testing,10)
(Please,7)
(storage,11)
(Hadoop,,13)
(Please,7)
(["Specifying,4)
(building,8)
(Please,6)


### Transformations on Pair RDDs
> - reduce(), fold(), combine()과 유사한 기능들이 Pair RDD을 위해서 각각 reduceByKey(), foldByKey(), combineByKey()으로 존재한다. 다만 reduce(), fold(), combine()는 **action**인 반면, reduceByKey(), foldByKey(), combineByKey()는 **transform**이다.

###### filter() for pair RDDs
> - pair RDD에 filter를 적용하는 아래 두 코드는 같다.
> - filter는 기본적으로 하나의 parameter를 받아 boolean을 return하는 함수를 argument로 받으므로, pair RDD에 이를 적용할 때에는 argument함수를 적절히 변형해야 한다.

In [5]:
pairs_simple.
  filter(x => x._2 > 5).
  collect()

Array((high-level,13), (supports,11), (guide,,6), (Alternatively,,11), (examples,12), ("yarn",9), (locally,13), (Testing,10), (Please,7), (storage,11), (Hadoop,,13), (Please,7), (building,8), (Please,6))

In [23]:
pairs_simple.
  filter{
    case (k, v) => v > 5
  }.
  collect()

Array((high-level,13), (supports,11), (guide,,6), (Alternatively,,11), (examples,12), ("yarn",9), (locally,13), (Testing,10), (Please,7), (storage,11), (Hadoop,,13), (Please,7), (building,8), (Please,6))

###### reduceByKey(func)
> - Transformation for pair RDD
> - groupByKey().reduce() 와 동일하며, reduceByKey가 성능이 더 좋다.

Example 1.

In [317]:
pairs_detail.
  reduceByKey((v1, v2) => v1 ++ v2).
  collect().
  foreach{
    case (k, v) => {
        print(k + ": ");
        v.foreach(vv => print(vv + ", "))
      }
    ; println()
  }

Please: see, the, guidance, on, how, to, refer, to, the, build, documentation, at, refer, to, the, [Configuration, Guide](http://spark.apache.org/docs/latest/configuration.html), 
guide,: on, the, [project, web, page](http://spark.apache.org/documentation.html), 
high-level: APIs, in, Scala,, Java,, Python,, and, R,, and, an, optimized, engine, that, 
locally: with, one, thread,, or, "local[N]", to, run, locally, with, N, threads., You, 
Hadoop,: you, must, build, Spark, against, the, same, version, that, your, cluster, runs., 
examples: to, a, cluster., This, can, be, a, mesos://, or, spark://, URL,, 
package.: For, instance:, 
storage: systems., Because, the, protocols, have, changed, in, different, versions, of, 
["Specifying: the, Hadoop, Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version), 
["Building: Spark"](http://spark.apache.org/docs/latest/building-spark.html)., 
"yarn": to, run, on, YARN,, and, "local", to, run, 
Alternatively,: 

Example 2.

In [311]:
pairs_simple.
  reduceByKey((v1, v2) => v1 + v2).
  collect().
  foreach(println)

(Please,20)
(guide,,6)
(high-level,13)
(locally,13)
(Hadoop,,13)
(examples,12)
(package.,3)
(storage,11)
(["Specifying,4)
(["Building,2)
("yarn",9)
(Alternatively,,11)
(Testing,10)
(supports,11)
(<http://spark.apache.org/>,1)
(building,8)


###### foldByKey()
> - Transformation for pair RDD
> - reduceByKey()와 동일하나 zero value를 갖는다는 차이
> - groupByKey().fold()와 동일, foldByKey가 성능이 더 좋음

In [27]:
pairs_simple.
  foldByKey(10)((v1, v2) => v1 + v2).
  collect().
  foreach(println)

(Please,30)
(guide,,16)
(high-level,23)
(locally,23)
(Hadoop,,23)
(examples,22)
(package.,13)
(storage,21)
(["Specifying,14)
(["Building,12)
("yarn",19)
(Alternatively,,21)
(Testing,20)
(supports,21)
(<http://spark.apache.org/>,11)
(building,18)


###### groupByKey()

Example 1

In [328]:
pairs_detail.
  groupByKey().
  collect().
  foreach{
    case (k, v) => {
        print(k + ": ");
        v.foreach(vv => print(vv + ", "))
      }
    ; println()
  }

Please: [Ljava.lang.String;@7f288d76, [Ljava.lang.String;@c98d34c, [Ljava.lang.String;@158bde0, 
guide,: [Ljava.lang.String;@287d9159, 
high-level: [Ljava.lang.String;@1385fa1d, 
locally: [Ljava.lang.String;@594d4992, 
Hadoop,: [Ljava.lang.String;@1d3e58e6, 
examples: [Ljava.lang.String;@598df541, 
package.: [Ljava.lang.String;@65e4b97f, 
storage: [Ljava.lang.String;@56566d88, 
["Specifying: [Ljava.lang.String;@3940e6cc, 
["Building: [Ljava.lang.String;@f513b58, 
"yarn": [Ljava.lang.String;@4f3e548c, 
Alternatively,: [Ljava.lang.String;@2a614ace, 
Testing: [Ljava.lang.String;@1429bfc8, 
supports: [Ljava.lang.String;@1224cdee, 
<http://spark.apache.org/>: [Ljava.lang.String;@2f037112, 
building: [Ljava.lang.String;@f147906, 


In [349]:
pairs_detail.
  groupByKey().
  collect().
  foreach{
    case (k, v) => {
        print(k + ": ");
        v.foreach{
          vv => vv.foreach(vvv => print(vvv + ", "))
          print("| ")
        }
      }
    ; println()
  }

Please: Please, see, the, guidance, on, how, to, | Please, refer, to, the, build, documentation, at, | Please, refer, to, the, [Configuration, Guide](http://spark.apache.org/docs/latest/configuration.html), | 
guide,: guide,, on, the, [project, web, page](http://spark.apache.org/documentation.html), | 
high-level: high-level, APIs, in, Scala,, Java,, Python,, and, R,, and, an, optimized, engine, that, | 
locally: locally, with, one, thread,, or, "local[N]", to, run, locally, with, N, threads., You, | 
Hadoop,: Hadoop,, you, must, build, Spark, against, the, same, version, that, your, cluster, runs., | 
examples: examples, to, a, cluster., This, can, be, a, mesos://, or, spark://, URL,, | 
package.: package., For, instance:, | 
storage: storage, systems., Because, the, protocols, have, changed, in, different, versions, of, | 
["Specifying: ["Specifying, the, Hadoop, Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version), | 
["Building: ["Buildin

Example 2.

In [329]:
pairs_simple.
  groupByKey().
  collect().
  foreach(println)

(Please,CompactBuffer(7, 7, 6))
(guide,,CompactBuffer(6))
(high-level,CompactBuffer(13))
(locally,CompactBuffer(13))
(Hadoop,,CompactBuffer(13))
(examples,CompactBuffer(12))
(package.,CompactBuffer(3))
(storage,CompactBuffer(11))
(["Specifying,CompactBuffer(4))
(["Building,CompactBuffer(2))
("yarn",CompactBuffer(9))
(Alternatively,,CompactBuffer(11))
(Testing,CompactBuffer(10))
(supports,CompactBuffer(11))
(<http://spark.apache.org/>,CompactBuffer(1))
(building,CompactBuffer(8))


###### combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
> - 가장 기본이되는 per-key aggregation function으로서, 이것을 이용해 다른 대부분의 per-key combiner 들이 구현되어 있다.
> - combine()과 마찬가지로 input data와 다른 type의 값을 return value로 얻을 수 있다.  
  
  
> - combine을 위해 각 partition의 element들을 거쳐가면서 만나게되는 key는 '만나지 못했던 key' 이거나 '이미 만났던 key'로 나뉠 것이다.
> - 만약 '만나지 못했던 key'라면 **createCombiner** 함수를 사용하여 그 key에 대한 초기값을 설정하게 된다.(이러한 초기값 설정은 전체 RDD에서 처음 등장하는 key에 대해서가 아니라 각 partition에서 처음 등장하는 key에 대해 일어난다.) 
> - 또한 만약 '이미 만났던 key'라면 **mergeValue** 함수를 사용하여 그 key에 대한 이전의 누적값(accumulator)과 새로운 value값에 대한 연산한다.
> - 앞서 createCombiner과 mergeValue를 이용한 집계는 각 partition단위로 일어났던 것이고, 하나의 key가 여러 partition에 존재할 수 있다.이때 동일한 key에 대한 여러 partition의 값들을 집계하기 위하여 **mergeCombiners** 함수를 사용한다.

아래 예제에서는 각 문장의 시작단어에 따른 각 문장의 단어수 평균을 계산해 본다.

In [83]:
lines.
  map(x => (x.split(" ")(0), x.split(" ").length)).
  take(5).
  foreach(println)

(#,3)
(,1)
(Spark,14)
(high-level,13)
(supports,11)


In [85]:
lines.
  map(x => (x.split(" ")(0), x.split(" ").length)).
  combineByKey(
    newv => (newv, 1),
    (acc: (Int, Int), oldv) => (acc._1 + oldv, acc._2 + 1),
    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
  ).
  take(5).
  foreach(println)

(And,(10,1))
(Spark,(44,4))
(Please,(20,3))
(The,(12,1))
([run,(8,1))


In [88]:
lines.
  map(x => (x.split(" ")(0), x.split(" ").length)).
  combineByKey(
    newv => (newv, 1),
    (acc: (Int, Int), oldv) => (acc._1 + oldv, acc._2 + 1),
    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
  ).
  map{case (k, v) => (k, v._1 / v._2.toFloat)}.
  takeOrdered(10)(Ordering[Float].reverse on(x => x._2)).
  foreach(println)

(high-level,13.0)
(locally,13.0)
(Hadoop,,13.0)
((You,13.0)
(Many,13.0)
(examples,12.0)
(for,12.0)
(in,12.0)
(The,12.0)
(rich,12.0)


###### mapValues(func)

In [331]:
pairs_simple.
  collect().
  foreach(println)

(high-level,13)
(supports,11)
(<http://spark.apache.org/>,1)
(guide,,6)
(["Building,2)
(Alternatively,,11)
(examples,12)
("yarn",9)
(locally,13)
(package.,3)
(Testing,10)
(Please,7)
(storage,11)
(Hadoop,,13)
(Please,7)
(["Specifying,4)
(building,8)
(Please,6)


In [334]:
pairs_simple.
  mapValues(v => v - 1).
  collect().
  foreach(println)

(high-level,12)
(supports,10)
(<http://spark.apache.org/>,0)
(guide,,5)
(["Building,1)
(Alternatively,,10)
(examples,11)
("yarn",8)
(locally,12)
(package.,2)
(Testing,9)
(Please,6)
(storage,10)
(Hadoop,,12)
(Please,6)
(["Specifying,3)
(building,7)
(Please,5)


> - 위의 표현은 아래와 같이 map을 이용하여 표현할 수 있다.

In [25]:
pairs_simple.
  map{case (k, v) => (k, v - 1)}.
  collect().
  foreach(println)

(high-level,12)
(supports,10)
(<http://spark.apache.org/>,0)
(guide,,5)
(["Building,1)
(Alternatively,,10)
(examples,11)
("yarn",8)
(locally,12)
(package.,2)
(Testing,9)
(Please,6)
(storage,10)
(Hadoop,,12)
(Please,6)
(["Specifying,3)
(building,7)
(Please,5)


###### flatMapValues(func)
> - 각 (key, value)에 대하여 (value, result1), (value, result2), (value, result...)가 생기고, (key, result1), (key, result2), (key, result3)...이 반환된다.

In [346]:
pairs_simple.
  flatMapValues(v => (v to 5)).
  collect().
  foreach(println)

(<http://spark.apache.org/>,1)
(<http://spark.apache.org/>,2)
(<http://spark.apache.org/>,3)
(<http://spark.apache.org/>,4)
(<http://spark.apache.org/>,5)
(["Building,2)
(["Building,3)
(["Building,4)
(["Building,5)
(package.,3)
(package.,4)
(package.,5)
(["Specifying,4)
(["Specifying,5)


###### keys()

In [335]:
pairs_simple.
  keys.
  collect().
  foreach(println)

high-level
supports
<http://spark.apache.org/>
guide,
["Building
Alternatively,
examples
"yarn"
locally
package.
Testing
Please
storage
Hadoop,
Please
["Specifying
building
Please


###### values()

In [347]:
pairs_simple.
  values.
  collect().
  foreach(println)

13
11
1
6
2
11
12
9
13
3
10
7
11
13
7
4
8
6


###### sortByKey()
> - 아래와 같이 custom sort를 사용할 수도 있다.

In [348]:
pairs_simple.
  sortByKey().
  collect().
  foreach(println)

("yarn",9)
(<http://spark.apache.org/>,1)
(Alternatively,,11)
(Hadoop,,13)
(Please,7)
(Please,7)
(Please,6)
(Testing,10)
(["Building,2)
(["Specifying,4)
(building,8)
(examples,12)
(guide,,6)
(high-level,13)
(locally,13)
(package.,3)
(storage,11)
(supports,11)


> - 아래 예에서는 string의 첫번째 혹은 두번째 문자를 비교하여 정렬한다.

In [44]:
implicit val sortIntegersByString = new Ordering[String] {
  override def compare(a: String, b: String) = a(0).toLower.compare(b(0).toLower)
}

In [45]:
pairs_simple.
  sortByKey(true).
  collect().
  foreach(println)

("yarn",9)
(<http://spark.apache.org/>,1)
(["Building,2)
(["Specifying,4)
(Alternatively,,11)
(building,8)
(examples,12)
(guide,,6)
(high-level,13)
(Hadoop,,13)
(locally,13)
(package.,3)
(Please,7)
(Please,7)
(Please,6)
(supports,11)
(storage,11)
(Testing,10)


In [46]:
implicit val sortIntegersByString = new Ordering[String] {
  override def compare(a: String, b: String) = a(1).toLower.compare(b(1).toLower)
}

In [47]:
pairs_simple.
  sortByKey(true).
  collect().
  foreach(println)

(["Building,2)
(["Specifying,4)
(package.,3)
(Hadoop,,13)
(Testing,10)
(<http://spark.apache.org/>,1)
(high-level,13)
(Alternatively,,11)
(Please,7)
(Please,7)
(Please,6)
(locally,13)
(storage,11)
(supports,11)
(guide,,6)
(building,8)
(examples,12)
("yarn",9)


### Transformations on two pair RDDs

In [1]:
val rdd = sc.parallelize(Seq((1,2), (3,4), (3, 6)))
rdd.collect().foreach(println); println
val other = sc.parallelize(List((3, 9)))
other.collect().foreach(println)

(1,2)
(3,4)
(3,6)

(3,9)


###### subtractByKey

In [371]:
rdd.
  subtractByKey(other).
  collect()

Array((1,2))

###### join

In [373]:
rdd.
  join(other).
  collect()

Array((3,(4,9)), (3,(6,9)))

###### rightOuterJoin
> - (참고로) Scala에는 Option이라는 datatype이 존재한다. Option type의 value는 그 값이 x로 존재할 경우 Some(x)이고, 그 값이 존재하지 않을 경우 None이다.

In [374]:
rdd.
  rightOuterJoin(other).
  collect()

Array((3,(Some(4),9)), (3,(Some(6),9)))

###### leftOuterJoin

In [375]:
rdd.
  leftOuterJoin(other).
  collect()

Array((1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9))))

###### cogroup

In [376]:
rdd.
  cogroup(other).
  collect()

Array((1,(CompactBuffer(2),CompactBuffer())), (3,(CompactBuffer(4, 6),CompactBuffer(9))))

### Action on Pair RDDs

In [383]:
pairs_simple.
  collect()

Array((high-level,13), (supports,11), (<http://spark.apache.org/>,1), (guide,,6), (["Building,2), (Alternatively,,11), (examples,12), ("yarn",9), (locally,13), (package.,3), (Testing,10), (Please,7), (storage,11), (Hadoop,,13), (Please,7), (["Specifying,4), (building,8), (Please,6))

###### countByKey()

In [384]:
pairs_simple.
  countByKey()

Map(Please -> 3, Hadoop, -> 1, examples -> 1, package. -> 1, locally -> 1, <http://spark.apache.org/> -> 1, building -> 1, guide, -> 1, supports -> 1, high-level -> 1, "yarn" -> 1, storage -> 1, ["Building -> 1, Alternatively, -> 1, ["Specifying -> 1, Testing -> 1)

###### collectAsMap()

In [385]:
pairs_simple.
  collectAsMap()

Map(high-level -> 13, Hadoop, -> 13, Please -> 6, locally -> 13, ["Specifying -> 4, package. -> 3, Alternatively, -> 11, building -> 8, Testing -> 10, "yarn" -> 9, ["Building -> 2, guide, -> 6, examples -> 12, <http://spark.apache.org/> -> 1, supports -> 11, storage -> 11)

###### lookup(key)

In [386]:
pairs_simple.
  lookup("Please")

WrappedArray(7, 7, 6)

### Word count example

> - 데이터를 살펴보면 아래와 같다.

In [55]:
lines.
  take(5).
  foreach(println)

# Apache Spark

Spark is a fast and general cluster computing system for Big Data. It provides
high-level APIs in Scala, Java, Python, and R, and an optimized engine that
supports general computation graphs for data analysis. It also supports a


> - flatMap: 우선 각 단어를 빈칸(" ")으로 나눈다. 이때 flatMap을 사용하면 각 line 구분을 없앨 수 있다.
> - map: 그리고나서 각 단어를 (단어, 1)로 바꿔준다.

In [56]:
lines.
  flatMap(x => x.split(" ")).
  map(x => (x, 1)).
  take(10).
  foreach(println)

(#,1)
(Apache,1)
(Spark,1)
(,1)
(Spark,1)
(is,1)
(a,1)
(fast,1)
(and,1)
(general,1)


> - reduceByKey: 각 key에 해당하는 value값들을 더해준다.
> - takeOrdered: 가장빈번하게 등장하는 단어를 살펴본다.

In [60]:
lines.
  flatMap(x => x.split(" ")).
  map(x => (x, 1)).
  reduceByKey((x, y) => x + y).
  takeOrdered(10)(Ordering[Int].reverse on(x => x._2)).
  foreach(println)

(,67)
(the,21)
(to,14)
(Spark,13)
(for,11)
(and,10)
(a,8)
(##,8)
(run,7)
(is,6)


### Parallelism
> - size of partition을 명시적으로 지정할 수 있다. 이러한 지정은 RDD 생성시 혹은 Transformation시 지정 할수도 있다.

> - repartition(): 말그대로 파티션을 다시 조정하는 기능이다. network를 따라 shuffle이 발생하므로 고비용 함수이다.
> - coalesce(): 최적화된 repartition()으로서 파티션을 줄이는 경우에 효과적으로 사용할 수 있다.

> - rdd.partitions.size를 통해 파티션 수를 확인 할 수 있다.

In [22]:
val rdd1 = sc.parallelize(Seq((1,2), (3,4), (3, 6), (4, 9)), 2)
val rdd2 = rdd1.reduceByKey((x, y) => x + y, 4)

In [23]:
println("# of partition of rdd1: " + rdd1.partitions.size)
println("# of partition of rdd2: " + rdd2.partitions.size)

# of partition of rdd1: 2
# of partition of rdd2: 4


### Data Partitioning

> - pair RDD에 대하여 partitioning을 수행할 수 있다.
> - 그렇다면 어느 노드에 어느 데이터를 놓는 것이 연산을 위해 효과적일까? 즉 어떤 규칙으로 partition을 정해야 할까?
> - 물론 Spark가 자동적으로 효과적인 partiton을 설정하지만, 사용자가 어느정도 그 원칙을 컨트롤 할수는 있다. 예를들어 hash partitioning이나 range partitioning등의 방법이나 partition의 수를 선택할 수 있다.

> - 예를들어 1만건의 접속기록이 있고, 매 5분마나 100건의 접속기록이 유입된다. 매번 100건이 당일 접속했던 유저의 접속인지 혹은 당일 처음 접속하는 유저의 접속기록인지 알고 싶다. 이를 위해 두 데이터 세트를 join하는 일이 필요한데, join을 할 때마다 두 데이터 세트의 key를 hashing하고 hashing결과에 따라 특정 node로 data를 옮기는 shuffling작업이 필요하다.  
> 이러한 shuffling을 피하기 위하여 미리 1만건의 데이터를 hash partition해 놓는다면, 5분마다 일어나는 이 join의 속도가 매우 향상될 것이다.

In [49]:
val pairs_simple = lines.
  filter(x => x.split(" ")(0).length > 5).
  map(x => (x.split(" ")(0), x.split(" ").length))

pairs_simple.
  collect().
  foreach(println)

(high-level,13)
(supports,11)
(<http://spark.apache.org/>,1)
(guide,,6)
(["Building,2)
(Alternatively,,11)
(examples,12)
("yarn",9)
(locally,13)
(package.,3)
(Testing,10)
(Please,7)
(storage,11)
(Hadoop,,13)
(Please,7)
(["Specifying,4)
(building,8)
(Please,6)


> - 아래와 같이 HashPartitioner를 'partition수'와 함께 할당하게 된다. 그런데 partitionBy는 transformation이므로 그 결과는 새로운 RDD이다. 따라서 새로운 변수로 그 결과를 받아야 하고, persist()를 하지 않으면 매번 hash partitioning이 일어나게 된다. 꼭 persist()를 해야 함.

In [51]:
import org.apache.spark.HashPartitioner
val parti_rdd = pairs_simple.
  partitionBy(new HashPartitioner(10)).
  persist()

> - 기본적으로 Spark는 transformation에 따라 적절한 partition rule을 선택한다. 예를들어 sortByKey()는 range-partition을, groupByKey()는 hash-partition을 선택하여 결과를 node에 분배한다. 반면 map()은 parent RDD의 partitioning information을 잊게 만든다.(map으로 key를 수정할 수 있으므로..)

> - RDD에 partitioner가 정의되어 있는지 확인하려면, RDD의 **partitioner** property를 확인하면 된다. 이 값은 scala.Option type으로서 값이 정의되어 있거나 그렇지 않을 수 있다. 정의되어 있는지 확인하려면 .isDefined 를 사용하여, 그 값을 얻으려면 .get을 사용할 수 있다.

In [57]:
parti_rdd.partitioner

Some(org.apache.spark.HashPartitioner@a)

In [58]:
parti_rdd.partitioner.isDefined

true

In [61]:
parti_rdd.partitioner.get

org.apache.spark.HashPartitioner@a

###### Operations that benefit from partitioning
> - cogroup(), groupwidth(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), lookup()은 partitioning을 할 경우 성능이 향상되는 operation이다.

> - 예를들어 reduceByKey()를 partition되어 있는 RDD에 사용할 경우, 같은 key를 같는 데이터는 같은 partition에 있을 확률이 높게 되므로, 각 node에서 reduce연산이 진행되면 연산 그 자체는 끝이나게 된다. (물론 집계를 위해서 각 worker node에서 master로 데이터 이동이 필요하다.)
> - 또한 cogroup()이나 join()와 같은 binary operation의 경우 최소한 한쪽 RDD만이라도 partition이 되어 있다면 shuffling이 줄어들게 된다. 만약 두 RDD가 동일한 partitioner를 갖고 있다면 그 연산의 성능은 더 좋아지게 된다.(예를들어 rdd1과 rdd2 = rdd.mapValue(..)이고 rdd1.cogroup(rdd2)인 경우)


###### Operations that affect partitioning
> - Spark는 operation에 따라 그 결과에 자동적으로 partitioner를 할당한다. 예를들어 두 RDD를 join()한 경우 그 결과는 hash-partition되어 있을 것이라 판단한다. 따라서 바로 이후에 진행되는 reduceByKey()의 경우 별도의 shuffling이 일어나지 않아 그 성능이 매우 좋게 된다.

> - 반면 pair-RDD에 map()을 사용한 경우, (이론적으로) map()연산이 key를 변경했을 수 있으므로 Spark는 그 결과에 대한 partitioner를 판단할 수 없게 된다. 따라서 pair-RDD의 key를 수정하지 않는 연산을 하는 경우 **mapValues()**나 **flatMapValues()**를 사용하는 것이 매우 효과적이다.

> - cogroup(), groupwidth(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort()의 경우 기본적으로 결과 RDD에 대한 partitioner가 할당된다. 또한 mapValue(), flatMapValues(), filter()의 경우 parent RDD에 partitioner가 할당되어 있을 경우 그 결과에 대한 partitioner가 할당된다.), 반면 다른 모든 operation에 대해서는 partitioner가 할당되지 않는다.

> - 마지막으로 binary operation에 대해서는 parent RDD의 partitioner에 따라 결과의 partitioner가 결정된다. 

###### end of chapter 4