Skip to content
SangminLim edited this page Nov 30, 2017 · 18 revisions

RDD 프로그래밍 하기

RDD(Resilient Distributed DataSet)

: 분산되어 존재하는 데이터 요서들의 모임

  • 새로운 RDD를 만들거나
  • 존재하는 RDD를 변형하거나
  • 결과 계산을 위해 RDD에서 연산을 호출

스파크는 내부적으로 스파크는 자동으로 RDD에 있는 데이터들을 클러스터에 분배하며 클러스터 위에서 수행하는 연산들을 병렬화 한다.

RDD 기초

  • RDD는 분산되어 존재하는 변경 불가능한 객체 모음
  • RDD는 사용자 정의 클래스를 포함해 파이썬, 자바, 스칼라의 어떤 타입의 객체든 가질 수 있다.
  • 두 가지 타입의 연산을 지원
    • 트랜스포메이션
    • 액션
  • 스파크는 RDD를 여유로운 방식(lazy evaluation) 으로 처음 액션을 사용하는 시점에 처리한다.

RDD 생성하기

  • 외부 데이터 로드 val lines = sc.textFile("README.md")
  • 직접 만든 프로그램에서 데이터 집합을 병렬화
val lines2 = sc.parallelize(List("pandas", "i like pandas"))

RDD의 연산

  • 트랜스포메이션(transformation)
    • 새로운 RDD를 만들어내는 연산(ex. map(), filter())
     val inputRDD = sc.textFile("log.txt")
     val errorRDD = inputRDD.filter(line => line.contains("error"))
  • 액션(action)
    • 프로그램에 결과를 돌려주거나 스토리지에 결과를 써 넣는 연산(ex. count(), first())
     println("Input had" + badLinesRDD.count() + " concerning lines")
     println("Here are 10 example:")
     badLinesRDD.take(10).foreach(println)

트랜스포메이션

  • 새로운 RDD를 만들어 돌려주는 연산
  • 실제로 액션이 사용되는 시점에 계산됨
  • RDD는 변경 불가능한것
  • 가계도(lineage graph)
    • 필요시 각 RDD를 재연산하거나 저장된 RDD를 유실될 경우 복구하기 위함

액션

  • 프로그램에 최종 결과 값을 돌려주거나 외부 저장소에 값을 기록하는 연산 작업
  • 실제로 트랜스포메이션이 계산을 수행 하도록 만든다.
  • 새로운 액션을 호출할 때마다 전체 RDD가 처음부터 계산됨

여유로운 수행 방식

  • 스파크가 액션을 만나기 전까지 트랜스포메이션을 처리하지 않음
  • 함수형 언어에서는 친숙한 방식
  • RDD는 데이터를 가지고 있지 않다. 메타데이터에 명령어만 기록 후, 액션을 만날 때 실제로 수행

스파크에 함수 전달하기

class SearchFunctions(val query: String) {
     def isMatch(s: String): Boolean = {
          s.contains(query)
     }

     def getMatchesFunctionReference(rdd: RDD[String]):  RDD[Boolean] = {
          //문제 : "isMatch"는 "this.isMatch"이므로 this의 모든 것이 전달된다.
          rdd.map(isMatch)
     }

     def getMatchFieldReference(rdd: RDD[String]): RDD[Array[String]] = {
          //문제 : "query"는 "this.query"이므로 this의 모든 것이 전달된다.
          rdd.map(x => x.split(query))
     }

     def getMatchesReference(rdd: RDD[String]): RDD[Array[String]] = {
          // 안전함: 필요한 필드만 추출하여 지역 변수에 저장해 전달한다.
          val query_ = this.query
          rdd.map(x => x.split(query_))
     }
}

스칼라에서 NoSerializableException이 발생한다면 직렬화 불가능한 클래스의 메소드나 필드를 참조하는 문제일 가능성이 많다. 최상위 객체의 멤버인 지역 변수나 함수 내에서 전달하는 것은 항상 안전하다는 점을 기억하자.

주로쓰는 트랜스포메이션

데이터 요소 위주 트랜스포메이션

- map(), filter(), flatMap(), distinct()
val input = sc.parallelize(List(1,2,3,4))
val result = input.map(x => x * x)
println(result.collect().mkstring(","))

val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first() // "hello"를 반환

가상집합연산

- distinct(), union(), intersection(), subtract(), catesian()
  • 주로쓰는 액션

  • reduce()

    • 두 개의 데이터를 합쳐 같은 타입 데이터 하나를 변환 하는 함수를 받는다.
     val sum = rdd.reduce((x,y) => x+y)
  • fold() 는 reduce()에 전달하는 것과 동일한 형태의 함수를 인자로 받으며, 거기에 추가로 각 파티션의 초기 호출에 쓰이는 "제로 벨류"를 인자로 받는다. fold(), reduce()는 결과 값의 타입이 RDD 내에서 연산하는 데이터 요소들의 타입과 동일해야 한다.

  • aggregate()는 RDD에서 동일한 타입을 되돌려 주어야 한다는 제한에서 자유로울 수 있다. aggregate()는 fold() 처럼 리턴받는 타입에 맞는 제로 벨류가 필요하다. 그리고 RDD의 값들을 누적값에 연계해 주는 함수가 필요하며, 마지막으로는 각 노드에서 자체적으로 값들을 합칠 수 있도록 두 개의 누적값을 합쳐 주는 두 번째 함수가 필요하다.

map() 과 fold()를 어떻게 대신하는지 보여 준다.

val result = input.aggregate((0,0)) (
                    (acc, value) => (acc._1 + value, acc._2 + 1),
                    (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))

val avg = result._1/ result._2.toDouble

- take(), count(), countByValue(), collect(), foreach()

RDD 타입 값 변환하기

  • 어떤 함수들은 특정한 타입의 RDD에서만 쓸 수 있다.(mean(), variance(), 키/값 페어 RDD 의 join()) 이런 기능들을 사용하려면 제대로 특정 클래스를 받아쓰고 있는지 확인을 해야한다.

영속화 (캐싱)

  • RDD는 호출하는 액션들에 대한 모든 의존성을 재연산
  • 이를 방지하기 위해서 영속화 요청 할 수 있다.
  • LRU(Least Recently Used)를 사용해 관리 import org.apache.spark.storeage.StoreageLevel val result = input.map( x=> x * x) result.persist(StoreageLevel.DISK_ONLY) println(result.count()) println(result.collect().mkString(","))

level MEMORY_ONLY MEMORY_ONLY_SER MEMORY_AND_DISK MEMORY_AND_DISK_SER DISK_ONLY

키/값 페어로 작업하기

  • 집합 연산 등에 빈번하게 사용
  • 파티셔닝을 통해 노드간 통신비용 줄임

Pair RDD 생성

val pairs = lines.map(x => (x.split(" ")(0), x))

Pair RDD의 트랜스포메이션

기본 RDD에서 가능한 모든 트랜스포메이션을 사용할 수 있다. 단, 페어 RDD는 튜플을 가지므로, 개별 데이터를 다루는 함수 대신 튜플을 처리하는 함수를 전달

페어 RDD의 트랜스포메이션

(예: {(1,2),(3,4),(3,6)})

reduceByKey(func)  : 동일 키에 대한 값들을 합친다.
rdd.reduceByKey((x,y) => x+y)
result : {(1,2),(3,10)}

groupByKey() : 동일 키에 대한 값들을 그룹화 한다.
rdd.groupByKey()
result: {(1,[2]), (3,[4,6])}

combineByKey()

  • 키별 집합 연산 함수 중 일반적으로 쓰인다. 대부분의 다른 키별 컴바이너들은 이를 기반으로 구현
  • aggreage()와 마찬가지로 combinByKey() 또한 입력 데이터와 동일한 타입의 값을 되돌려 줄 필요 없다.

처리

  • 한 파티션 내의 데이터들에 대해 이전 데이터와 같은 키는 mergeValue() 함수를 해당 키에 대한 어큐뮬레이터의 현재 값과 새로운 값에 적용해서 합친다.
  • 각 파티션으로 부터 결과를 최종적으로 합칠 때 둘 이상의 파티션이 동일 키에 대한 어큐뮬레이터를 갖고 있다면 mergeCombiners()를 써서 합쳐지게된다.
vall result = input.combineByKey() (
(v) => (v,1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 +1),
(acc1:(Int, Int),  acc: (Int, Int)) =>(acc1._1 + acc2._1,  acc1._2 + acc2._2))
.map { case(key, value) => (key, value._1 / value._2.toFloat)}

result.collectAsMap().map(println(_))

mapValues(func) :키의 변경 없이 페어 RDD의 각 값에 함수를 적용한다.
rdd.mapValues(x => x+1)
result : {(1,3),(3,5),(3,7)}

flatMapValues(func) : 페어 RDD의 각 값에 대해 반복자를 리턴하는 함수를 적용하고, 리턴받은 값들에 대해 기존 키를 써서 키/값 쌍을 만든다. 종종 토큰 분리에 쓰인다.

rdd.flatMappValues(x=> (x to 5))
result :{(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)}

keys() : RDD가 가진 키들만을 되돌려준다.
rdd.keys()
result: {1,3,3}

values() : RDD가 가진 값들을 되돌려 준다.
rdd.values()
result : {2,4,6}

sortByKey() :키로 정렬된 RDD를 되돌려 준다.
rdd.sortByKey()
result : {(1,2),(3,4),(3,6)}

두 페어 RDD에 대한 트랜스포메이션

rdd={(1,2),(3,4),(3,6)}, other={(3,6)}

subtractByKey : 다른 쪽 RDD에 있는 키를 써서 RDD의 데이터를 삭제한다.
rdd.subtractByKey(other)
result : {(1,2)}

join : 두 RDD에 대해 이너조인(inner join)을 수행

rdd.join(other)
result: {(3,(4,9)),(3,(6,9))}

rightOuterJoinn : 두 RDD에 있는 키들을 대상으로 두 RDD 간에 조인을 수행한다.
rdd.rightOuterJoin(other)
result : {(3,(Some(4),9)),3,(Some(6),9))}

leftOuterJoinn : 다른 쪽 RDD에 있는 키들을 대상으로 두 RDD 간에 조인을 수행한다.
rdd.leftOuterJoin(other)
result : {1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9)))}

cogroup : 동일 키에 대해 양쪽 RDD를 그룹화 한다.
rdd.cogroup(other)
result : {(1,([2],[])), (3, ([4,6],[9]))}

Filter

pairs.filter{case (key, value) => value.length < 20}
- 스파크는 map{case(x,y):(x,func(y))} 과 동일한 mapValues(func)함수를 제공

집합연산

키별 평균 구하기

rdd.mapValues(x =>(x,1)).reduceByKey((x,y) => (x._1 + y._1, x._2 + y>-2))
result
키        값
panda  0
panda  1
pink     4
pink     3

mapValues
키         값
panda   (0,1)
panda   (1,1)
pink      (3,1)
pink      (4,1)

reduceByKey
키           값
panda    (1,2)
pink       (7,2)
  • 맵리듀스의 컴바이너 개념에 익숙한 사람들은 reduceByKey()나 foldByKey()를 호출하는 것이 각 키에 대한 총합을 계산하기 전에 각 머신에서 자동적으로 병합을 수행한다는 것에 주의해야 한다. 사용자가 컴바이너를 지정해 줄 필요가 없다.
  • 좀 더 범용적인 combineByKey() 같은 것을 쓰면 병합 로직을 직접 만들어 줄 수도 있다.

스칼라에서의 단어 세기

val input = sc.textFile("s3://...")
val word = input.flatMap(x => x.split(" "))
val result = words.map(x => (x,1)).reduceByKey((x,y) => x + y)
  • 첫 번쨰 RDD에 countByValue()를 적용하면 더 빠르게 단어를 셀 수 있다.
- input.flatMap( x=> x.split(" ")).countByValue()

병렬화 수준 최적화

  • 스파크는 클러스터 사이즈에 맞는 적절한 파티션 개수를 찾는 방식으로 동작
  • 더 나은 퍼포먼스를 내기위해 병렬화 수준을 직접 정해줄 수 있다.
val data = Seq(("a",3), ("b",4), ("a",1))
sc.parallelize(data).reduceByKey((x,y) => x +y) // 기본 병렬화 수준 사용
sc.parallelize(data).reduceByKey((x,y) => x +y, 10) //병렬화 수준 지정
  • repartition() : RDD의 파티셔닝을 바꾸고 싶을 때 사용하고 새롭게 파티션을 구성하므로 노드간 네트워크로 데이터 교환이 일어난다(셔플링) 그러므로 비용이 큰작업이다. 스파크는 또한 repartition()의 최적화 버전인 coalese() 제공하고 이는 RDD의 파티션 개수를 줄이는 경우에 한해서는 데이터 이동이 발생하지 않는다. rdd.partitions.size(), 로 현재 파티션의 개수를 파악할 수 있따.

데이터 그룹화

  • groupByKey() : RDD의 키를 사용해서 데이터를 그룹화한다. K 타입의 키와 V 타입의 값을 가진 RDD라면[K, Iterable[V]]타입의 RDD를 되돌려 준다.

  • groupBy() : 쌍을 이루지 않았거나 현재 키와 관계되지 않은 다른 조건을 써서 데이터를 그룹화하고자 하는 경우에 쓰인다.

데이터 정렬

val input: RDD[(Int, Venue)] =...
implicit val sortIntegersByString = new Ordering[Int] {
     override def compare(a:Int, b:Int) = a.toString.compare(b.toString)
}
rdd.sortByKey(sortIntegerByString)

페어 RDD에서 쓸 수 있는 액션

함수

countByKey() : 각 키에 대한 값의 개수를 센다
rdd.countByKey()
result : {(1,1), (3,2)}

collectAsMap() : 쉬운 검색을 위해 결과를 맵 형태로 모은다.
rdd.collectAsMap()
result :Map{(1,2),(3,4),(3,6)}

lookup(key) : 들어온 키에 대한 모든 값을 되돌려 준다.
rdd.lookup(3)
result : [4,6]

데이터 파티셔닝(고급)

//코드 초기화, 사용자 정보를 HDFS의 하둡 시퀀스 파일에서 읽어온다. //이는 찾아낸 HDFS 블록에 맞춰 userData의 요소들을 적절히 분산시키지만, //특별히 스파크에 어떤 ID가 어떤 파티션에 존재하는지에 대해서 정보를 제공하지는 않는다..

val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()

//지난 5분간의 이벤트 로그 파일을 처리하기 위해 주기적으로 불리는 함수. //여기서 처리하는 시퀀스 파일이 (UserID, LinkInfo) 쌍을 갖고 있다고 가정한다.

def processNewLogs(logFileName: String) {
     val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
          val joined = userData.join(events) //(UserID, (UserInfo, LinkInfo))를 페어로 가지는 RDD

     val offTopicVisits = joined.filter {
          case(userId, (userInfo, linkInfo)) =>!userInfo.topics.contains(linkInfo.topic)
     }.count()
     println("Number of visits to non-subscrived topics : " + offTopicVisits)
}
- 개선 방법은 partitionBy()를 사용
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
                         .partitionBy(new HashPartitioner(100))
                         .persist()
  • userData.join(events)를 호출할 때 스파크는 오직 events RDD만 셔플해서 이벤트 데이터를 각각의 UserID와 맞는 userData 해시 파티션이 있는 머신으로 전송할 것이다. 결과는 매우 적은 네트워크 비용을 쓰지만 속도는 매우 빠르다.

5장 데이터 불러오기/저장하기

파일포맷

  • 일반적으로 지원되는 파일 포맷
포멧이름           구조화여부                                       비고
JSON                 일부             고전적인 일반 텍스트 파일, 한 라인을 한 레코드로 간주한다.
CSV                  예               범용적인 텍스트 기반 포멧이며 반구조화, 라인을 레코드 하나로 처리
시퀀스파일            예               키/값 데이터를 위한 일반적 하둡 파일 포멧
프로토콜 버퍼         예               빠르고 효율적으로 공간을 쓰는, 다중 언어를 지원하는 포멧
오브젝트 파일         예               공유된 코드끼리 스파크 작업의 데이터를 저장하는 데에 유용하다. 
                                      하지만 이는 자바의 객체 직렬화에 의존한 방법으로 
                                      클래스를 변경하면 깨질 위험이 있다.

textFile

val input = sc.textFile("file://home/holden/repos/spark/README.md")

wholeTextFile

  • 파일 사이즈가 메모리에 모두 불러들여 작업 할 경우 사용
스칼라로 파일별 평균값 구하기
val input = sc.wholeTextFile("file://home/holden/salesFiles")
val result = input.mapValues{y =>
     val nums = y.split(" ").map(x => x.toDouble)
     nums.sum /nums.size.toDouble
}

와일드 카드 지원

  • 큰 데이터 세트가 여러 파일에 흩어져 있을 경우, 특히 파일들이 동일 디렉터리에 섞여 있다면 유용 ex) part-*.txt

텍스트 파일 저장하기

result.saveTextFile(outputFile)

JSON

  • 텍스트 파일로 불로온 뒤 JSON 파서를 써서 값들을 매핑
  • 직렬화 라이브러리 사용하여 문자열 처리

JSON 불러오기

  • 여러 라인으로 이루어진 JSON 데이터 파일이라면 전체 파일을 불러와서 각 파일별로 파싱해야함.
  • mapPartition()써서 파서를 재사용
스칼라에서 JSON 불러오기
- Person 클래스의 인스턴스에 레코드를 불러와 담는 예제
//최상위 레벨 클래스여야 함..
     case class Person(name: String, lovesPandas: Boolean) 
//파싱하여 지정된 케이스 클래스에 담는다. 문제가 생기면 빈 리스트(None)를
//되돌려 주기 위해 flatMap을 사용했으며 모두 괜찮다면 아이템 하나가 들어 있는 리스트를
//되돌려 준다(Some(_)).
     val result = input.flatMap(record, classOf[Person]))
     try {
          Some(mapper.readValue(record, classOf[Person]))
     } catch {
          case e : Exception => None
     }})

JSON 저장하기

result.filter(p => p.lovePandas).map(mapper.writeValueAsString(_))
.saveAsTextFile(outputFile)

쉼표 구분 데이터와 탭 구분 데이터(CSV, TSV)

  • 레코드들은 대개 한 줄에 하나씩 저장되지만 늘 그런 것은 아니며 간혹 여러 줄에 걸쳐 있을 수도 있다. CSV와 TSV 파일들에서 간혹 그런 일관성이 꺠질 수 있는데 가장 빈번하게 개행 문자다루기, 이스케이프 문자 다루기 비아스키 문자 다루기, 정수가 아닌 숫자 다루기등에 관한 것들 때문이다.
  • 하둡의 InputFormat 구현 중의 하나인 CSVInputFormat(http://bit.ly/1FigUkg)을 써서 스칼라 및 자바에서 CSV 데이터를 불러올 수도 있지만, 이는 개행을 포함하는 레코드는 지원하지 않는다.

스칼라에서 전체적으로 CSV 불러오기

val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap{ case (_, txt) =>
          val reader = new CSVReader(new StringReader(txt));
          reader.readAll().map(x  => Person(x(0), x(1)))
}
  • 입력 파일 개수가 매우작고 wholeFile() 메소드를 쓸 필요가 있다면 스파크가 추후 연산을 효과적으로 병렬화하도록 입력을 재파티션하는 것이 좋을 수 있다.

CSV 저장하기

pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray)
.mapPartitions{ people =>
     val stringWriter = new stringWriter();
     val csvWriter = new CSVWriter(stringWriter);
     csvWriter.writeAll(people.toList)
     Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)

시퀀스 파일

  • 키/값 쌍의 비중첩 파일로 구성된 인기 있는 하둡의 파일 포맷
  • 동기화 표시를 갖고 있어서 스파크가 그 부분까지 파일 탐색을 했을때 필요한 레코드 경계까지만 재동기화 가능
  • Writable 인터페이스를 구현한 데이터들로 구성
  • 하둡의 RecordReader는 동일한 객체를 레코드마다 재사용하므로 데이터를 읽어들이는 RDD에 직접 cache를 호출하면 실패 할 수 있다. 대신 간단한 map() 연산을 추가해서 그 결과를 캐시하자. 게다가 많은 하둡의 Writable류 클래스들은 java.io.Serializable을 구현하지 않으므로 그것들이 RDD에서 동작하게 하기 위해서라도 map()으로 변환 할 필요가 있다.

시퀀스 파일 불러오기

-스칼라에서 시퀀스 파일 불러오기
val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).
map{case(x,y) => (x.toString, y.get())}
  • 스칼라에서는 Writable 타입들을 자동적으로 적절한 스칼라 타입으로 바꿔 주는 편리한 함수가 있다. keyClass와 valueClass를 지정해 주는 대신, sequenceFile[Key,Value](path, minPartitions)를 호출하면 기본 스칼라 타입의 RDD를 되돌려 준다.

시퀀스 파일 저장하기

val data = sc.parallelize(List("Panda",3), ("key",6),("snail",2))) data.saveAsSequenceFIle(outputFile)

오브젝트 파일

  • 시퀀스 파일에 대해 단순한 포장을 더해 값만을 가진 RDD
  • 자바 직렬화를 사용
  • 만약 클래스가 변경된다면 예를 들어, 필드가 추가되거나 삭제된다면 - 이전에 만들어진 오브젝트 파일은 더 이상 읽어들일 수 없다.
  • 오브젝트 파일 저장은 RDD에서 saveAsObjectFile만 부르면 되므로 간단하다. 읽는 것도 꽤 간단하다. SparkContext의 objectFile()함수에 경로를 전달하면 RDD를 되돌려 준다.

하둡 입출력 포멧

  • 신/구 하둡API를 모두 지원

다른 하둡 입력 포멧으로 불러오기

  • 새로운 하둡 API로 파일을 읽으려면 newAPIHadoopFile은 경로와 함께 세가지 클래스를 받아 들인다.
  1. 포멧 클래스
  2. 키를 위한 클래스
  3. 값을 나타 내는 클래스
- KeyValueTextInputFormat이 있는데 키/값 데이터를 텍스트 파일로 부터 읽기 위해 쓰인다. 각 라인은 개별적으로 처리되며 키와 값은 탭문자로 구분된다. 
스칼라에서 구버전 API로 KeyValueTextInputFormat()불러오기
val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat] (inputFile).map{
                    case(x,y) => (x.toString, y.toString)
}
- 스칼라에서 엘리펀트 버드로 LZO로 압축되 JSON 데이터 불러오기
val input = sc.newAPIHadoopFile(inputFile, classOf[LzoJsonInputFormat],
classOf[LongWritable], classOf[MapWritable], conf)
//"input"에서 각 MapWritable은 JSON객체로 표현된다.
  • LZO 사용은 hadoop-lzo 패키지 설치가 필요하며 스파크가 그 원천 라이브러리를 가리키도록 해야 한다. 데비안 패키지를 설치한다면 --driver-libaray-path/usr/lib/hadoop/lib/native/ --driver-class-path /usr/lib/hadoop/lib/를 spark-submit 호출 시 옵션으로 주는 것이 한 방법이다.

하둡 출력 포맷으로 저장하기

파일 시스템 외의 데이터 소스

예제: 프로토콜 버퍼

  • 깔끔하게 정의된 타입과 필드를 가지는 구조화된 데이터 포멧
  • 빠른 변환

파일 압축

파일 시스템

  • 스파크는 다양한 파일 시스템에 대해 읽고 쓰기를 지원하며 원하는 어떤 파일 포멧으로도 작업할 수 있다.

로컬/"일반" FS

  • NFS, AFS, 맵알의 NFS 레이어 같은 네트워크 파일 시스템들은 사용자에게 일반 파일 시스템처럼 보이게 된다. 만약 데이터가 이미 이런 시스템에 있다면 단순히 file:// 경로로 지정해서 사용할 수 있다. 스파크는 각 노드마다 동일한 경로에 데이터가 마운트되어 있는 한 처리가 가능

스칼라에서 로컬 파일 시스템에 있는 압축 텍스트 파일 불러오기

val rdd = sc.textFile("file://home/holden/happypands.gz")

아마존 S3

HDFS

스파크 SQL로 구조화 데이터 다루기

아파치 하이브

- 스칼라에서 HIveContext를 생성하고 데이터 가져오기
import org.apache.spark.sql.hive.HiveContext

val hiveCtx = new HiveContext(sc)
val rows = hiveCtx.sql("Select name, age From users")
val firstRow = rows.first()
println(firstRow.getString(0)) //0번 필드가 'name' 칼럼이다

JSON

  • 데이터가 레코드마다 일관된 스키마를 갖고 있다면 스파크SQL은 스키마를 예상하여 각 레코드를 불러올 수 있는데 필요한 필드 데이터만 뽑아 오는 것을 쉽게 만들 수 있다.
스칼라에서 스파크 SQL로 JSON 읽기
val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("select user.name, text from tweets")

데이터베이스

자바 데이터 베이스 연결

스칼라에서 jdbcRDD

def createConnection() = {
     Class.forName("com.mysql.jdbc.Driver").newInstance();
     DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}

def extractValues(r: ResultSet) = {
     (r.getInt(1), r.getString(2))
}

val data = new JdbcRDD(sc,
     createConnection, "select * from panda where ? <= id and id <= ?",
     lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
println(data.collect().toList)

jdbcRDD 는 몇 가지 인자를 받아 들인다. - 일단, 데이터베이스에 접속할 수 있는 함수를 제공해야 한다. 이 함수로 각 노드가 데이터를 불러오기 위한 자체적인 연결 설정 후 연결을 맺을 수 있다. - 다음으로, 원하는 범위의 데이터를 읽을 질의와 함께 질의에 대한 속성으로 lowerBound 와 upperBound를 넣어준다. 이 속성들은 스파크가 다른 머신들에서 다른 데이터 범위를 읽어 올 수 있게 함으로써 단일 노드에서 전체 데이터를 읽느라 병목이 되지 않게 해 준다. -마지막 인자는 각 레코드의 출력을 java.sql.ResultSet 에서 데이터를 처리하기 편한 포맷으로 바꿔 주기 위한 함수이다.

카산드라

val conf = new SparkConf(true)
               .set("spark.cassandra.connection.host", "hostname")
val sc = new SparkContext(conf)

스칼라에서 키/값 데이터를 가진 RDD로 전체 데이블 불러오기
//SparkContext와 RDD에 함수를 추가
import com.datastax.spark.connector._

//RDD로 전체 데이블을 읽는다. 테이블은 다음처럼 만들어진 걸로 가정한다.
//Create table test.kv(key, text primary key, value int);
val data = sc.cassandraTable("test", "kv")
//기본 데이터 통계 출력
data.map(row => row.getInt("value")).stats()

스칼라에서 카산드라에 저장하기
val rdd = sc.parallelize(List(Seq("moremagic", 1)))
rdd.saveToCassandra("test", "kv", SomeColumns("key", "value"))

HBase

HBase에서 읽어 오는 스칼라 예제

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tablename")
//어느 테이블을 읽을지 지정

val rdd = sc.newAPIHadoopRDD(
     conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable],
classOf[Result])

일래스틱 서치

스칼라에서 일래스틱 서치 출력
 val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.mr.EsOutputFormat")
jobConf.setOutputCommitter(classOf[FileoutputCommitter])
jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets")
jobConf.set(ConfigurationOptions.Es_NODES, "localhost")
FileOutputFormat.setOutputPath(jobConf, new Path("_"))
output.saveAsHadoopDataset(jobConf)

스칼라에서 일래스틱서치 입력

def mapWritableToInput(in: MapWritable): Map[String, String] = {
     in.map{case(k,v) => (k.toString, v.toString)}.toMap
}

val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args(1))
jobConf.set(ConfigurationOptions.ES_NODES, args(2))
val currentTweets = sc.hadoopRDD(jobConf,
     classOf[EsInputFormat[Object, MapWritable]], classOf[Object],
     classOf[MapWritable])
//맵만 추출한다,
//MapWritable[Text, Text]를 Map[String, String]으로 변환한다.
val tweets = currentTweets.map{case(key, value) => mapWritableToInput(value)}

6장 고급 스파크 프로그래밍

  • 어큐뮬레이터 : 누산
  • 브로드캐스트 변수 : 분산

어큐뮬레이터

  • 작업 노드에서 드라이버 프로그램으로 보내는 값의 집합 연산에 대해 간단한 문법제공
  • 디버깅 목적
val file = sc.textFile("file.txt")
//0으로 초기화한 Accumulator[Int]를 생성한다.
val blankLines = sc.accumulator(0)

val callSigns = file.flatMap(line => {
     if(line == "" {
          blankLines += 1 // 어큐뮬레이터에 더한다.
     }
     line.split(" ")

callSigns.saveAsTextFile("output.txt")
//saveAsTextFile을 실행 된 다음에 얻을 수 있다
//saveAsTextFile이 액션이므로 액션이 실행된 후에 되기 때문이다.
println("Blank lines: " + blankLines.value)

요약

  • 드라이버에서 SparkContext.accumlator(initalValue) 메소드를 호출하여 초기 값을 가진 어큐뮬레이터를 만든다. 반환 타입은 org.apache.spark.Accumulator[T] 객체이면 T는 초기값의 타입이다.
  • 스파크 클러저의 작업 노드 코드에서 어큐뮬레이터에 += 메소드를 써서 값을 더 한다.
  • 드라이버 프로그램에서 value 속성을 불러 어큐뮬레이터의 값에 접근한다.
  • 작업 노드의 태스크는 어큐뮬레이터의 value에 접근 할 수 없다는 점을 기억하라. 즉 이 태스크의 관점에서 어큐뮬레이터는 '쓰기 전용' 변수인 셈이다. 이는 어큐뮬레이터가 매 업데이트마다 통신할 필요가 없도록 효율적으로 구현 되어 있기 때문이다.
파이썬에서 어큐뮬레이터로 에러 세기
#콜사인 검증을 위한 어큐뮬레이터를 만든다.
validSignCount = sc.accumulator(0)
invalidSignCount = sc.accumulator(0)

def validateSign(sign):
     global validSignCount, invalidSignCount
     if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4|}[a-zA-Z]{1,3}\Z", sign):
          validSignCount += 1
          return True
     else:
          invalidaSignCount +=1
          return False
#각 콜사인마다 접속한 횟수를 센다
validSign = callSign.filter(validateSign)

contactCount = validSigns.map(lambda sign: (sign,1)).reduceByKey(lambda (x,y): x+y)

#연산을 시도하여 어큐뮬레이터가 계산되게 한다.
contactCount.count()
if invalidSignCount.value < 0.1 * validSignCount.value:
     contactCount.saveAsTextFile(outputDir + "/contactCount")
else:
     print "Too many error: "%d in %d" %
           (invalidSignCount.value, validSignCount.value)

어큐뮬레이터와 장애 내구성 어큐뮬레이터는 이와 어떻게 동작하는 것일까? 최종 결과는 액션에 사용되었던 어큐뮬레이터들에 대한 것이며, 각 태스크의 업데이트는 스파크에 의해 각 어큐뮬레이터에 한 번씩만 반영된다. 즉, 장애나 반복 연산의 횟수와 관계없이 절대적으로 믿을 만한 값을 얻기 원한다면 어큐뮬레이터를 foreach() 같은 액션 안에 넣어야 한다.

액션이 아닌RDD 트랜스포메이션에 사용되는 어큐뮬레이터에 대해서는 이런 보장을 할 수가 없다. 따라서 트랜스 포메이션 안에서 어큐뮬레이터는 디버깅 목저으로만 쓰여야 한다.

사용자 지정 어큐뮬레이터

  • 스파크는 사용자가 직접 정의한 어큐뮬레이터 타입이나 집합 연산을 만들 수 있는 API도 제공한다. http://bit.ly/1eAcpGW
  • 이 외에도 교환 법칙과 결합 법칙이 성립하는 추가 연산은 모두 쓸 수 있다.

브로드캐스트 변수

  • 스파크 연산에 쓸 크고 읽기 전용인 값을 모든 작업 노드에 효과적으로 전송하는 데에 쓴다.
스칼라에서 브로드캐스트 변수를 쓴 국가 검색
#RDD 의 contactCounts 에서 콜 사인의 위치를 검색한다.
#이 검색 작업을 위해 각 국가 코드의 콜 사인 접두어
#리스트를 불러온다.
val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign, count) =>
     val country = lookupInArray(sign, signPrefixes.value)
     (country, count)
}.reduceByKey((x,y) => x+y)
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

브로드 캐스트 변수를 쓰는 절차

  • T 타입의 객체에 SparkContext.broadcast를 호출하여 Broadcast[T]를 만든다. Serializeable 이라면 어떤 객체든 가능하다.
  • value 속성으로 값에 접근한다
  • 변수는 각 노드에 한 번만 보내지며 읽기 전용으로 취급된다.

브로드캐스트 최적화 바이트 사이즈가 큰 값들을 브로드캐스팅할 때 값을 직렬화하거나 직렬화된 값을 네트워크로 보내는 시간이 오래 걸린다면 해당 부분이 쉽게 병목이 될 수 있으므로 빠르고 작은 크기의 데이터 직렬화 포멧을 선택하는 것이 중요하다. 스파크의 스칼라나 자바 API에서 기본적으로 쓰는 자바 직렬화는 기본 타입의 배열을 제외하고는 특히 더 비효율적이다.

파티션별로 작업하기

  • 파티션별로 작업하는 것은 데이터 아이템에 대해 셋업 절차의 반복을 피하게 해준다. 난수 생성기 객체를 생성하는 작업 등이 각 아이템보다 반복될 여지가 있는 셋업의 예가 될수 있다. 스파크는 파티션 기반 버전의 map과 foreach를 제공하여 RDD의 각 파티션들에서 한 번만 코드를 실행하게 해 줌으로써 그런 작업들에 대한 비용을 줄여준다.
val contactsContactLists = validSigns.distinct().mapPartitions{
     signs =>
     val mapper = createMapper()
     val client = new HttpClient()
     client.start()
     //http 요청 생성
     signs.map {sign =>
          createExchangeForSign(sign)
     //응답 가져옴
     }.map{ case (sign, exchange) =>
          (sign,  readExchangeCallLog(mapper, exchange))
     }.filter(x=> x._2 != null) // 빈 콜 로그 삭제
}

파티션 별로 작업을 할 경우에 스파크는 사용자가 만든 함수에 그 파티션의 데이터가 담긴 Iterator를 준다. 값을 리턴하기 위해서는 Iterable을 되돌려 주어야한다. mapParitions()외에도 여러개의 파티션별 연산자를 갖고 있다.

파티션별 연산을 지원하는 연산자 mapPartitions(), mapParititonWithIndex(), foreachPartition()

외부 프로그램과 파이프로 연결하기

  • pipe() 이용
스칼라에서 pipe()로  finddistance.R을 호출하는 드라이버 프로그램

//외부 R  프로그램으로 각 콜의 거리를 계산하고
//이 작업에서  각 노드가 다운로드받을 파일 리스트에 스크립트를 추가한다.
val distScript = "./src/R/finddistance.R"
val distScriptName = "finddistance.R"
sc.addFle(distScript)
val distances = contactsContactLists.values.flatMap(x=>
                    x.map(y => s"$y.contactlat, $y.contactlong,$y.mylat,$y.mylong*)).pipe(Seq(sparkFiles.get(distScriptName)))
println(distances.collect().toList)

스크립트가 사용 가능해지면 RDD 의 pipe() 메소드를 통해 RDD의 데이터를 파이프로 연결 할 수 있다.

  • rdd.pipe(Seq(SparkFiles.get("finddistance.R"),","))
  • rdd.pipe(SparkFIles.get("finddistance.R")+",")

수치 RDD 연산들

count() : RDD가 갖고 있는 데이터 개수
mean()  : 데이터 값들의 평균
sum()    : 총합
max()
min()
variance() :  값들의 분산
sampleVariance() : 표본값에 대한 분산
stdev() : 표준편차
sampleStdev() : 표본값에 대한 표준 편차

스칼라에서 먼 거리의 로그 걸러내기 //이제 일부 잘못 보고된 위치일 가능성이 있는 먼 거리의 로그들은 걸러 낼 수 있다. //우선 문자열 RDD를 받아 double 형태로 바꾼다.

val distanceDouble = distance.map(string => string.toDouble)
val stats = distanceDoubles.stats()
val stddev = stats.stdev

클러스터에서 운영하기

스파크 실행구조

  • 하나의 드라이버와 익스큐터들을 합쳐서 스파크 애플리케이션이라고 부른다.

드라이버

  • 드라이버란 main() 메소드가 실행되는 프로세스를 말한다. 드라이버는 SparkContext를 생성하고 RDD를 만들고 트랜스포메이션과 액션을 실행하는 사용자 코드를 실행하는 프로세스이다.
  1. 사용자 프로그램을 태스크로 변환 내부적으로 연산들의 관계에 대해 논리적인 지향성 비순환 그래프(DAG, Directed Acyclic Graph)를 생성한다. 드라이버가 실행 될때 드라이버는 이 논리 그래프를 물리적인 실행 계획으로 변환한다. 스파크는 맵 트랜스포메이션을 '파이프라이닝' 해서 합치는 등 여러 개의 태스크로 이루어지며 단위 작업들은 묶여서 클러스터로 전송된다.

  2. 익스큐터에서 태스크들의 스케쥴링

  • 물리적 실행 계획이 주어지면 스파크 드라이버는 익스큐터들에서의 개별 작업들을 위한 스케쥴을 조정한다. 익슈큐터들은 시작하면서 드라이버에 등록을 하게 되므로 항상 애플리케이션의 실행에 대해 전체적으로 볼 수 있다. 각 익스큐터는 태스크들을 실행하고 RDD 데이터를 저장하는 프로세스이다.

익스큐터

  • 스파크의 익스큐터는 주어진 스파크 작업의 개별 태스크들을 실행하는 작업 실행 프로세스이다.

클러스터 매니저

  • 얀, 메소스, 내장 매니저가 있다

프로그램 실행하기

  • 사용자는 spark-submit을 사용하여 애플리케이션을 제출
  • spark-submit은 드라이버 프로그램을 실행하고 사용자가 정의한 main() 메소드를 호출한다.
  • 드라이버 프로그램은 클러스터 매니저에게 익스큐터 실행을 위한 리소스를 요청한다.
  • 클러스터 매니저는 드라이버 프로그램을 대신해 익스큐터들을 실행하다.
  • 드라이버 프로세스가 사용자 애플리케이션을 통해 실행된다. 프로그램에 작성된 RDD의 트랜스포메이션과 액션에 기반하여 드라이버는 작업 내역을 단위 작업 형태로 나눠 익스큐터들에게 보낸다.
  • 단위 작업들은 결과를 계산하고 저장하기 위해 익스큐터에 의해 실행된다.
  • 드라이버 main()이 끝나거나 SparkContext.stop()이 호출된다면 익스큐터들은 중지되고 클러스터 매니저에 사용했던 자원을 반화한다.

spark-submit을 써서 애플리케이션 배포하기

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

스파크 애플리케이션 간의 스케쥴링

실제 환경에서는 여러 사용자 사이에서 공유

  • 다중 접속 환경 클러스터에서의 스파크는 기본적으로 클러스터 매니저가 스파크 애플리케이션들 사이에서 자원공유 및 관리 기능에 의존하여 스케쥴링한다. 많은 클러스터 매니저들은 우선순위나 처리 용량 한계에 따른 작업 큐들을 정의 할 수 있는 기능이 있다.
  • 페어 스케줄러는 장시간 동작 애플리케이션이 작업 ㅅ케줄링을 위해 우선순위 조정을 할 수 있도록 큐를 제공.

클러스터 매니저

  1. 단독 클러스터 매니저
  • 이는 하나의 마스터와 여러 개의 작업자로 구성되며, 각각은 설정에 따른 용량의 메모리와 CPU 코어 개수만큼을 사용한다. 사용자는 애플리케이션을 제출할 때 익스큐터가 메모리를 얼마나 쓸지뿐만 아니라 모든 익스큐터가 사용할 총 코어 개수도 지정할 수 있다.
  1. 단독 클러스터 매니저 실행

  2. 애플리케이션 제출하기

  • spark-submit --master spark://masternode:7077 yourapp
  1. 자원 사용량 설정
  1. 익스큐터 메모리
  • spark-submit 실행시 --executor--memory 를 써서 이를 지정할 수 있다. 각 애플리케이션은 각 작업노드에 최대 하나의 익스큐터를 갖게 되므로 이 설정은 결국 애플리케이션이 각 작업 노드에서 메모리를 얼마나 쓸지를 결정하게 된다. 기본적으로 이 설정은 1GB이며 대개의 경우는 더 많은 양의 설정을 쓰게 된다.
  1. 코어 수의 최대총합
  • 이는 애플리케이션이 모든 익스큐터가 사용할 코어 개수의 총합이다. 기본 설정은 무한대이다. 다시 말하면, 애플리케이션은 클러스터에서 가능한 모든 머신에 익스큐터를 실행한다. 하지만 다중 사용자가 작업을 한다면 사용량을 제한해야한다. 이 값은 spark-submit에 --total--executor--cores인자를 지정하거나 스파크 설정 파일에서 spark.cores.max를 설정해서 제어 할 수 있다.

하둡 얀

  • 스파크에서 얀을 쓰는 것은 직관적이다. 하둡 설정 디렉터리를 가리키도록 환경변수를 설정하고 spark-submit에 특별한 마스터 URL로 작업을 제출하면 된다.
  1. 자원 사용량 설정
  • 얀에서 스파크를 실행할 때 애플리케이션은 spark-submit이나 spark-shell 등등에 --num-executors 플래그로 지정해 준 개수의 익스큐터만을 사용한다. 기본 설정은 2이므로 대개의 경우는 이 값을 올려야 한다. 또한 각 익스큐터가 사용할 메모리는 --executor-memory로 설정할 수 있으며 --executor-cores로 얀에 요청할 코어 개수를 설정한다.

아파치 메소스

  • 아파치 메소스는 클러스터에서 분석 워크로드나 장시간 동작 서비스 모두 사용할 수 있게 해주는 범용 목적의 클러스터 매니저이다.
  • spark-submit --master mesos://masternode:5050 yourapp

어떤 클러스터 매니저를 써야 할까?

  • 새로 배포할 예정의 애플리케이션이라면 단독 클러스터로 먼저 시작하자. 단독 모드는 셋업이 가장 쉽고 스파크만 돌릴 경우 다른 클러스터 매니저들이 제공하는 거의 모든 기능을 동일하게 제공한다.
  • 스파크를 다른 애플리케이션들과 같이 돌리고 싶거나 좀 더 우수한 자원 스케줄링 기능을 써야 한다면(큐 기능 같은) 얀과 메소스로 가능하다. 특히 얀은 많은 하둡 배포판에 함께 설치 되는 경우가 많다.
  • 얀이나 단독모드에 비해 메소스가 가지는 장점은 스파크 셀 같은 대화형 애플리케이션들의 명령 실행 간에 CPU 사용량을 자동으로 낮추는 세밀한 공유 옵션이다. 이는 다양한 사용자가 대화형 셀을 사용하는 환경에서는 특히 매력적이다.
  • 모든 경우에 있어서 스파크는 저장소에 빠른 접근을 위해 HDFS와 동일한 노드에 설치되는 것이 최상이다. 메소스나 단독 클러스터 매니저는 동일 노드에 수동으로 설치할 수 있으며 대부분의 하둡 배포판은 이미 얀과 HDFS를 함께 설치하고 있다.

스파크 최적화 및 디버깅

SparkConf로 스파크 설정하기

  • SparkConf 값들은 애플리케이션 코드 안에 프로그래밍으로 정의 한다.
스칼라에서 SparkConf를 이용하여 애플리케이션 생성
val conf = new SparkConf()
conf.set("spark.app.name", "My Spark App")
conf.set("spark.master", "local[4]")
conf.set("spark.ui.port", "36000") //기본 포트 재정의
//이 설정으로 SparkContext를 만든다.
val sc = new SparkContext(conf)

spark-submit

-애플리케이션에 설정값을 동적으로 줄 수 있다. -파일에서 읽는 것도 지원한다.

  • 우선순위 : SparkConf > spark-submit > 설정 파일 > 기본 값
플래그를 사용하여 실행 시의 설정값 지정
bin/spark-submit \
--class com.example.MyApp \
--master local[4] \
--name "My Spark App" \
--conf spark.ui.port=36000 \
myApp.jar

기본값 파일을 사용한 실행 시의 설정값 지정
bin/spark-submit \
--clss com.example.MyApp \
--properties-file my-config.conf \
myApp.jar

my-config.conf의 내용
spark.master local[4]
spark.app.name "My Spark App"
spark.ui.port 36000

일반적인 스파크 설정값

  • spark.executor.memory (익스큐터당 메모리)
  • spark.executor.cores, spark.cores.max(애플리케이션에 사용될 코어 개수)
  • spark.speculation ( 투기적 실행)
  • spark.storage.blockManagerTimeoutIntervalMS (GC로 인해 정지되는 작업들은 이 수치를 100초나 더 높게 하면 중단되는 것을 피할 수있다.
  • spark.execuoter.extraJavaOptionsSpark.executor.extraClasspathSpark.executor.extraLibraryPath (JVM 실행옵션)
  • spark.serializer (직렬화, 카이로 사용)
  • spark.[X].port ( 스파크 애플리케이션을 실행할 때에 정수 포트 값을 설정)
  • spark.eventLog.enabled (이벤트 로깅)
  • spark.eventLog.dir (로깅에 쓰일 저장 장치 주소)

실행을 구성하는 것 : 작업, 태스크, 작업 단계

  • RDD의 가계도를 출력하기 위해 스파크는 toDebugString() 메소드를 제공
  • 액션을 실행하기 전에 RDD들은 단순히 나중에 여유 있는 연산 실행 시에 쓰기 위한 메타 데이터만 저장
  • 액션을 수행시 필요한 RDD 연산의 물리적 실행 계획을 만든다.
  • 스파크의 내부 스케줄러는 RDD가 이미 클러스터 메모리나 디스크에 캐싱되어 있는 경우 RDD 그래프의 가계도를 제거할 수 있다.
  1. 사용자 코드가 RDD의 DAG를 정의한다. RDD의 연산들은 새로운 RDD를 만들고 이것들은 부모를 참조하게 되면서 이에 따라 그래프가 만들어진다.

  2. DAG가 액션의 실행 계획으로 변환되게 한다. RDD에서 액션을 호출하면 그떄는 반드시 연산이 수행되어야 한다. 이는 또한 부모 RDD에게도 연산을 요구하게 된다. 스파크의 스케쥴러는 RDD들이 필요한 모든 연산을 수행하도록 작업을 제출한다. 이 작업은 하나 이상의 작업 단계를 갖게 되며, 이는 태스크들로 구성된 병렬 집단 연산들을 말한다. 각 작업 단계는 DAG에서 하나 이상의 RDD들과 연계된다. 하나의 작업단계가 파이프라이닝에 의해 여러개의 RDD와 연계될 수 있다.

  3. 태스크들이 스케줄링이 되고 클러스터에서 실행된다. 작업 단계들은 순서대로 실행되며 RDD의 조각들을 연산하기 위한 태스크들을 실행한다. 작업의 최종단계가 끝나면 액션이 완료된다.

정보찾기

스파크 웹 UI

  1. job
  • job 페이지는 실행 중이거나 최근에 완료된 작업들에 대한 세부적인 실행 정보를 갖고있다.
  • 이 페이지는 주로 작업의 성능을 보기 위해 쓰인다
  • 관심 있는 작업 단계가 있다면 stage페이지로 들어가서 성능 이슈를 파악 하는데에 도움을 받을 수 있다.
  • 좋은 시작 지점은 태스크의 실행 시간이다.
  • 성능 비대칭을 살펴보는 것 말고도 태스크가 실행되는 각 단계(데이터 읽기, 연산 쓰기)에 걸리는 시간을 파악해 보는 것도 도움이 된다.
  1. Storage
  • Storage 페이지는 영속화되어 있는 RDD에 대한 정보를 보여준다.
  1. Executors
  • 이 페이지는 활성화된 익스큐터들에 대해 작업 처리와 관련된 수치들과 각 익스큐터가 쓰고 있는 저장 장치의 용량을 보여준다
  • Executors 페이지의 다른 기능은 "Thread Dump" 버튼을 써서 실행 중이 익스큐터의 스택 트레이스를 가져올 수 있다. 익스큐터의 콜 스택을 시각화해 보는 것은 정확히 그 시점에 어떤 코드가 실행 중인지를 파악할 수 있게 해준다.
  1. Environment
  • 이 페이지는 스파크 애플리케이션의 실행 환경에서 활성화된 설정 속성들을 나열해 준다.

드라이버와 익스큐터 로그

-로그는 내부적인 경고 메시지나 사용자 코드에서 발생한 예외 같은 비 정상적인 이벤트에 대해 상세한 정보를 담고 있다. 이런 데이터는 에러나 예기치 않은 동작으로 문제를 겪을 떄에 특히 도움이 된다.

스파크 로그의 위치는 배포모드에 따라 다르다.

  • 단독 모드에서 애플리케이션 로그는 마스터 웹 UI에 직접 표시된다. 이들은 기본적으로 각 작업 노드의 스파크 설치 위치의 work/ 아래에 저장된다.
  • 메소스에서 로그는 메소스 슬레이브 노드의 work/ 밑에 저장되며 메소스 마스터 UI로 볼 수 있다.
  • 얀 모드에서 로그를 모으는 가장 간단한 방법은 애플리케이션에서 모은 로그에 대한 보고서 생성하도록 얀의 로그 수집 도구를 사용하는 것이다.

성능에 관한 핵심 고려 사항

병렬화 수준

  • RDD의 논리적인 표현은 객체들의 모음이다. 물리적 실행 간에 RDD는 여러 개의 파티션으로 나뉘고 각 파티션은 전체 데이터의 일부를 갖고 있게 된다. 스파크는 태스크를 스케줄링하고 실행할 때 각 파티션당 저장된 데이터를 처리할 테스크를 하나씩 만들고, 그 태스크는 실행을 위해 클러스터에 기본적으로 하나의 코어를 요청한다.

  • 스파크는 연산을 위해 병렬화 수준을 조정할 수 있는 두 가지 방법을 제공한다 첫 번째는 데이터 셔플이 필요한 연산 간에 생성되는 RDD를 위한 병렬화 정도를 인자로 줄 수 있다. 두 번째는 이미 존재하는 RDD를 더 적거나 더 많은 파티션 갖도록 재배치 할 수 있다. repartition() 메소드는 RDD를 무작위로 섞어 원하는 개수의 파티션으로 다시 나눠 준다. 만약 파티션 개수를 줄인다면 coalesce() 를 쓸 수도 있다. 셔플 작업을 하지 않으므로 repartition() 보다 훨씬 효과적이다. 만약 병렬화 개수가 너무 많거나 너무 적다면 이 메소드들로 데이터를 재분배할 수 있을 것이다.

PySpark 셀에서 큰 RDD로 합치기

와일드 카드 입력은 수천 개 파일이 될 수도 있다. input = sc.textFile("s3n://log-files/2014/*.log") input.getNumpartitions() //35154 대부분의 데이터를 제외시키는 필터링 lines = input.filter(lambda line : line.startswith("2014-10-17")) lines.getNumPartitions() 35154 캐싱하기 전에 RDD의 lines RDD를 합친다. lines = lines.coalesce(5).cache() lines.getNumPartitions() 5이후의 분석 작업은 합쳐진 RDD상에서 실행된다. lines.count()

직렬화 포켓

  • 스파크는 네트워크로 데이터를 전송하거나 디스크에 쓸 때 객체들을 직렬화해 바이너리 포멧으로 변환 시켜야 한다. 이는 셔플 작업 동안 이루어지며 잠재적으로 매우 클 가능성이 있는 데이터들이 전송된다. 기본적으로 스파크는 자바에 내장된 직렬화를 이용한다. 하지만 스파크는 자바 직렬화보다 휠씬 향상된 서드파티 라이브러리인 카이로(Kyro)를 쓰는 것도 지원하는데, 이는 더 빠르면서 더 간편한 바이너리 포멧을 지니고 있지만, 모든 타입의 객체가 자동적으로 직렬화되지는 않는다. 직렬화에 카이로를 적용한다면 거의 모든 애플리케이션에서는 이득을 볼 수 있을 것이다.
#카이로를 사용 설정하고 클래스 등록하기
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KroSErializer")
//클래스 등록을 엄격하게 하도록 한다.
conf.set("spark.kryo.registrationRequired", "true")
conf.registKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass]))

메모리 관리

RDD 저장용

  • RDD 에 persist()나 cache()를 호출 할 때 그 파티션들은 메모리 버퍼에 저장된다. 스파크는 캐싱에 사용되는 메모리 최대치를 JVM의 전체 힙 메모리 대비 spark.storage.memoryFraction에 지정된 비율 만큼으로 제한한다. 이 한계를 초과하면 오래된 파티션들은 메모리에서 제거된다. 셔플 및 집합 연산 버퍼 -셔플 연산 수행 시에 스파크는 셔플 출력 데이터를 저장하는 중간 버퍼를 만든다. 이 버퍼는 집합 연산의 중간 결과를 저장하거나 셔플 결과의 일부분으로 출력될 데이터를 저장하는 용도로 쓰인다. 스파크는 셔플 관련 버퍼 사용 메모리의 총량을 spark.shuffle.memoryFraction으로 제한한다. 사용자 코드
  • 스파크는 임의의 사용자 코드를 실행하게 되므로 사용자 코드 자체도 상당한 양의 메모리를 쓰게 될 수 있다. 예를 들어, 사용자 코드 내에서 큰 배열이나 많은 객체들을 할당하게 된다면 이것들은 전체 메모리 사용에 영향을 끼치게 된다. 사용자 코드는 RDD 저장과 셔플 버퍼 등이 할당된 다음에 JVM 힙에 남아 있는 나머지 메모리 전부를 쓰게 된다.

기본적으로 스파크는 RDD 저장용으로 60%를 할당하고 셔플 메모리에 20%, 사용자 프로그램에 나머지 20%를 배정한다. 경우에 따라서는 이를 조정하여 더 나은 성능을 얻을 수도 있다. 만약 사용자 코드에서 매우 큰 객체를 할당하거나 하다면 저장용이나 셔플 메모리를 줄여서 메모리 고갈을 막을 수 있다.

-cache() : MEMORY_ONLY (LRU적용되며 삭제된 데이터가 필요시 재연산) -persist() : MEMEORY_AND_DISK (RDD 파티션을 디스크에 쓰므로 재연산 비용 절약)

  • 직렬화 : MEMEORY-ONLY-SER나 MEMORY-AND-DISK-SER (가비지 켈렉션에 걸리는 시간을 줄임) 만약 큰 데이터(기가바이트 단위의)를 객체 형태로 캐싱해야 하거나 가비지 컬렉션으로 인해 긴 시간 애플리케이션이 멈추는 현상 이 있다면 이 방법을 고려해 볼만하다. 이런 정지 현상은 애플리케이션 UI에서 각 태스크의 GC TIME 칼럼에서 확인할 수 있다.

하드웨어 프로비저닝

스파크 SQL

스파크 SQL 세가지 주기능

  • 파이썬, 자바, 스칼라에서 DataFrame 추상화 클래스를 제공하는데, 이는 구조화된 데이터세트를 다루는 작업을 간편하게 만들어준다. DataFrame은 관계형 DB의 테이블과 유사한 개념이다.

  • 다양한 구조적 포멧의 데이터를 읽고 쓸 수 있다.

  • 스파크 프로그램 내부에서나 표준 데이터 베이스 연결(JDBC, ODBC)을 제공하는 외부 툴 태블루 같은 BI 툴 등을 써서 스파크 SQL을 통해 SQL로 데이터를 질의 할 수 있다.

스파크 SQL 라이브러리 링크

  • 스파크 SQL은 하둡의 SQL 엔진인 아파치 하이브를 포함하거나 또는 포함하지 않고서도 빌드 가능하다
  • 하이브와 의존성 충돌, 제외, 감추기 방식이 안되면 하이브 없이 스파크 SQL을 링크하여 빌드 가능하다.
  • 추천하는 포인트는 HiveQL과 다른 하이브 기반 기능을 사용할 수 있는 HiveContext이다.

애플리케이션에서 스파크 SQL 사용하기

  • 스파크 SQL을 사용하는 가장 강력한 방식은 스파크 애플리케이션 내부에서 쓰는 것이다. 이는 쉽게 SQL을 써서 데이터를 적재하고 날릴 수 있게 해주면서도 동시에 파이썬, 자바, 스칼라로 작성한 일반적인 프로그램 코드와 같이 사용한다.
스칼라 초기화
//스파크 SQL import
import org.apache.spark.sql.hive.HiveContext
//혹은 하이브 의존성을 쓰지 않는 경우
import org.apache.spark.sql.SQLContext

스칼라에서 SQL 컨텍스트 생성
val sc = new SparkContext(..)
val hiveCtx = new HiveContext(sc)

스칼라에서 트윗 데이터 읽어서 질의하기
val input = hiveCtx.jsonFile(inputFile)
//입력 스키마 RDD 등록
input.registerTempTable("tweets")
//retweetCount 기준으로 트윗들을 가져온다.
val topTweets = hiveCtx.sql("SELECT text, retweetCount From tweets order by retweetCount limit 10")

데이터 프레임

  • 데이터 베이스의 테이블과 비슷하고 RDD 접근 및 map, filter 연산 가능
  • 가장 중요한 것은 어떤 데이터프레임이든 임시 테이블로 등록하여 HiveContext.sql이나 SQLContext.sql로 질의를 날릴 수 있다는 점이다.

데이터프레임과 RDD 간의 변환

val topTweetText = topTweets.rdd().map(row => row.getString(0))

캐싱

hiveCtx.cacheTable("tableName")

데이터 불러오고 저장하기

스칼라에서 하이브 읽기
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new HiveContext(sc)
val rows = hiveCtx.sql("SELECT key, value From mytable")
val keys = rows.map(row => row.getInt(0))
JSON
val input = hiveCtx.jsonFile(inputFile)
RDD에서 가져오기
case class HappyPerson(handle: String, favouriteBeverage: String)
....
//HappyPerson을 하나 만들고 Schema RDD로 변환한다.
val happyPeopleRDD = sc.parallelize(List(HappyPerson("hodel", "coffee")))
//노트 :내부적 변환이 발생한다.
//즉, sqlCtx.createDataFrame(happyPeopleRDD) 와 동일하다.
happyPeopleRDD.registerTempTable("happy_people")

JDBC/ODBC 서버

  • 스파크 SQL은 BI 도구 등에서 스파크 클러스터에 접속하거나 클러스터를 다양한 사용자가 쓰는 것에 도움이 되도록 JDBC 기능을 제공한다.
  • 스파크 SQL의 JDBC 서버는 하이브의 HiveServer2와 연계된다. 이것은 쓰리프트 통신 프로토콜을 사용하므로 "쓰리프트 서버"로 알려져 있다

JDBC 서버구동

./sbin/start-thriftserver.sh --master sparkMaster

비라인으로 JDBC 서버 접속하기

./bin/beeline -u jdbc:hive2://localhost:10000
show tables;
예제 데이터를 읽어 들여서 테이블 만들기
- CREATE TABLE IF NOT EXIST mytable (key INT, value STRING)
ROW FORMAT DELIMITED FILEDS TERMINATED BY ',';

- LOAD DATA  LOCAL INPATH 'learning-spark-examples/files/int-string.csv'
   INTO TABLE mytable;

CACHE TABLE 테이블명을 쓴다. 나중에는 UNCACHE TABLE 테이블명

사용자 정의 함수

스파크 SQL UDF
스칼라 UDF 로 문자열 길이 구하기
hiveCtx.udf.register("strLenScala", (_:String).length)
val tweetLength = hiveCtx.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10")

하이브 UDF 스파크 SQL은 또한 기존의 하이브 UDF를 쓸 수도 있다.

스파크 SQL 성능

성능 최적화 옵션

  • codegen기능은 실행을 위한 특별한 코드를 생성하므로 오래 걸리는 쿼리들이나 반복적이 쿼리들을 상당히 빠르게 실행시키게 해준다. codegen 기능은 아직 실험적 기능의 단계이지만 큰 규모의 쿼리나 자주 실행되는 반복적인 쿼리에 대해서는 적용을 추천한다.
  • 최적화에 신경 쓸 필요가 있는 다른 옵션은 spark.sql.inMemoryColumnarStorage.batchSize이다. 데이터프레임을 캐시할 때 스파크 SQL은 RDD의 레코드들을 옵션에 주어진 배치 사이즈 만큼씩 묶어서 각 묶음을 압축한다.
Clone this wiki locally