-
Notifications
You must be signed in to change notification settings - Fork 0
Thomas
- 스파크 코어 : 작업 스케줄링, 메모리 관리, 장애 복구, 저장 장치와의 연동 등 기본 적인 기능
- 스파크 SQL : 정형 데이터를 처리하기 위한 스파크 패키지. SQL, 하이브 테이블, Parquet, JSON 등 지원
- 스파크 스트리밍 : 실시간 데이터 스트림을 처리하는 컴포넌트
- MLlib : 머신 러닝 기능을 가진 라이브러리
- 그래프X : 그래프를 다루기 위한 라이브러리
- 클러스터 매니저 : YARN, Apache Mesos, Standalone Scheduler
- 2009 UC Berkeley RAD 연구실(이후 AMPLab)의 연구 프로젝트
- 2010 Open Source
- 2013 ASF
- HDFS, 로컬 파일 시스템, AWS S3, Cassandra, Hive, HBase 등 하둡 API를 사용하는 저장 시스템을 지원함.
- 텍스트 파일, 시퀀스 파일, Avro, Parquet, 다른 하둡의 InputFormat이 지원하는 파일까지 지원함.
- 드라이버 프로그램 : 애플리케이션의 main 함수를 갖고, 클러스터의 분산 데이터세트를 정의하고 연산 작업을 수행한다.
- SparkContext 객체 : 연산 클러스터에 대한 연결. 드라이버 프로그램이 이를 통해 스파크에 접속함.
https://spark.apache.org 에서 다운로드
파이선 셸
bin/pyspark
스칼라 셸
bin/spark-shell
- conf/log4j.properties 파일로 로그 레벨 수정
- MacOSX에서는 hosts 파일에 127.0.0.1 에 대한 호스트 설정 필요함
직접 SparkContext 객체를 초기화해줘야함.
RDD : 탄력적인 분산 데이터세트(Resilient Distributed Dataset).
- 분산되어있는 변경 불가능한 객체 모음
- 여러개의 파티션으로 나뉨
- 연산
- 트랜스포메이션
- 액션
-
외부 데이터세트의 로드
val lines = sc.textFile("/path/to/file")
-
직접 만든 드라이버 프로그램에서 데이터 집합을 병렬화
val lines = sc.parallelize(List("pandas", "i like pandas"))
새로운 RDD를 만들어 돌려주는 연산
val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))
- map() : 각 요소에 함수 적용한 결과 RDD 리턴
- filter() : 함수의 조건을 통과한 값으로만 이루어진 RDD 리턴
- flatMap() : 각 요소에 함수 적용한 결과의 반복자 내용으로 이루어진 RDD 리턴
- distinct() : 중복 제거
- sample(withReplacement, fraction, [seed]) : 복원/비복원 추출로 표본을 뽑아냄
- union()
- intersection()
- subtract()
- cartesian()
(p.48)
드라이버 프로그램에 최종 결과 값을 되돌려 주거나 외부 저장소에 기록하는 연산
errorsRDD.take(10).foreach(println)
- reduce(func) : 병렬로 병합
- fold(zero)(func) : reduce w/ zero val.
- aggregate(zeroValue)(seqOp, combOp) : 다른 타립 리턴
- take(num) : RDD 값 중 num 개
- collect() : 모든 데이터 요소
- count() : 요소 개수
- countByValue() : 각 값의 갯수
- top(num) : 상위 num 개
- takeOrdered(num)(ordering) : ordering 기준 상위 num 개
- takeSample(withReplacement, num, [seed]) : 무작위 값
- foreach(func) : 각 값에 func 적용
(p.52)
- 인라인으로 정의된 함수
- 메소드에 대한 참조
- 정적 함수
함수나 참조하는 데이터들이 직렬화 가능해야함. 메소드나 필드는 전체 객체에 대한 참조도 포함됨
스칼라는 묵시적 변환. 단, import org.apache.spark.SparkContext._
필요함.
(p.53)
- 동일한 RDD를 여러 번 사용하는 경우 모든 의존성을 재연산하게 되는 것을 피하기 위한 방법.
- 영속화 요청시 RDD를 계산한 노드들은 그 파티션들을 저장하고 있음.
- LRU 정책
- persist()
- unpersist()
Pair RDD : 키/값 쌍을 가지고 있는 RDD
-
일반 RDD로 부터 생성
val pairs = lines.map(x => (x.split(" ")(0), x))
-
메모리 상의 페어 데이터세트로 부터 생성 : SparkContext.parallelize()
scala> val data = List((1,2),(3,4),(3,6)) data: List[(Int, Int)] = List((1,2), (3,4), (3,6)) scala> val rdd0 = sc.parallelize(data) rdd0: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:26
-
reduceByKey(func) : 동일 키에 대한 값을 병합
scala> rdd0.reduceByKey((x,y) => x+y).collect() res1: Array[(Int, Int)] = Array((1,2), (3,10))
-
groupByKey() : 동일 키에 대한 값
scala> rdd0.groupByKey().collect() res2: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(2)), (3,CompactBuffer(4, 6)))
-
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) : 다른 결과 타입, 동일 키의 값을 병함
val input = sc.parallelize(List(("coffee", 1),("coffee", 1),("coffee", 2),("panda", 3),("coffee", 9))) val result = input.combineByKey( (v) => (v, 1), (acc:(Int,Int), v) => (acc._1 + v, acc._2 + 1), (acc1:(Int,Int), acc2:(Int,Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ).map{ case (key, value) => (key, value._1 / value._2.toFloat) } result.collectAsMap().map(println(_))
(coffee,3.25) (panda,3.0) res1: Iterable[Unit] = ArrayBuffer((), ())
-
mapValues(func) : 키 변경없이 각 값에 함수 적용
scala> rdd0.mapValues(x => x+1).collect() res4: Array[(Int, Int)] = Array((1,3), (3,5), (3,7))
-
flatMapValues(func) : 각 값에 함수를 적용한 리턴 반복자의 값들에 기존 키로 키/값 쌍을 만든다
scala> rdd0.flatMapValues(x => (x to 5)).collect() res5: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))
-
keys()
scala> rdd0.keys().collect() res6: Array[Int] = Array(1, 3, 3)
-
values()
scala> rdd0.values().collect() res7: Array[Int] = Array(2, 4, 6)
-
sortByKey()
scala> rdd0.sortByKey().collect() res8: Array[(Int, Int)] = Array((1,2), (3,4), (3,6))
-
subtractByKey
scala> rdd0.subtractByKey(other).collect() res9: Array[(Int, Int)] = Array((1,2))
-
join
scala> rdd0.join(other).collect() res10: Array[(Int, (Int, Int))] = Array((3,(4,9)), (3,(6,9)))
-
rightOuterJoin
scala> rdd0.rightOuterJoin(other).collect() res13: Array[(Int, (Option[Int], Int))] = Array((3,(Some(4),9)), (3,(Some(6),9)))
-
leftOuterJoin
scala> rdd0.leftOuterJoin(other).collect() res14: Array[(Int, (Int, Option[Int]))] = Array((1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9))))
-
cogroup : 동일 키에 대해 RDD를 그룹
scala> rdd0.cogroup(other).collect() res15: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((1,(CompactBuffer(2),CompactBuffer())), (3,(CompactBuffer(4, 6),CompactBuffer(9))))
-
countByKey() : 각 키에 대한 값 갯수
scala> rdd0.countByKey() res1: scala.collection.Map[Int,Long] = Map(1 -> 1, 3 -> 2)
-
collectAsMap() : 결과를 맵 형태로 모음
scala> rdd0.collectAsMap() res2: scala.collection.Map[Int,Int] = Map(1 -> 2, 3 -> 6)
-
lookup(key) : 키에 대한 모든 값
scala> rdd0.lookup(3) res3: Seq[Int] = WrappedArray(4, 6)
데이터세트가 여러번 재활용 될 경우, 파티셔닝 제어 방법을 선택이 유용할 수 있음
-
파티셔너 지정
sc.sequenceFile[UserID, UserInfo]("...").partitionBy(new HashPartitioner(100)).persist()
-
도움이 되는 연산
cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), lookup()
-
결과 RDD에 파티셔너가 지정되는 연산
cogorup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort()
-
부모 RDD가 파티셔너 가진 경우 지정되는 연산
mapValues(), flatMapValues(), filter()
-
사용자 정의 파티셔너
org.apache.spark.Partitioner 클래스를 상속하고 다음 메소드를 구현
- numPartitions: Int
- getPartition(key: Any): Int
- equals()
데이터 불러오기/저장하기
RDD에 텍스트 파일 하나를 읽어들이면, 각 라인이 RDD의 개별 데이터로 들어감.
여러개의 파일을 페어 RDD로 읽을 수 있음.(파일명, 파일 내용)
- 한개 파일, 디렉토리내 여러 파일을 불러오기
val input = sc.textFile("file:///Users/thomas/spark/README.md")
val input = sc.textFile("file:///Users/thomas/testfiles/")
-
파일명, 파일 내용
형태의 페어 RDD로 여러개 파일 불러오기
val input = sc.wholeTextFiles("file:///Users/thomas/testfiles")
val resul t= inut.mapValues( y =>
val nums = y.split(" ").map(x => x.toDouble)
nums.sum / nums.size.toDouble
)
- 디렉토리 내의 여러 개의 파일로 저장히기
sc.saveAsTextFile("file:///Users/thomas/testoutputfile")
//NOT WORK!!!
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
case class Person(name: String, lovesPandas: Boolean)
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
val input = sc.textFile("file:///Users/thomas/test.json")
val result = input.flatMap(record => {
try {
Some(mapper.readValue(record, classOf[Person]))
} catch {
case e: Exception => None
}
})
//NOT WORK!!!
result.map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)
import java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
val input = sc.textFile("file:///Users/thomas/test.csv")
val result = input.map{ line =>
val reader = new CSVReader(new StringReader(line))
reader.readNext()
}
pandaLovers.map{person => List(person.name, person.favoriteAnimal).toArray}
.mapPartitions{people =>
val stringWriter = new StringWriter
val csvWriter = new CSVWriter(stringWriter)
csvWriter.writeAll(people.toList)
Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)
val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWriteable]).
map{case (x, y) => (x.toString, y.get())}
- 로컬
- AWS S3
- HDFS
hive-site.xml
을 스파크의 conf
디렉토리에 복사해야 함
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new HiveContext(sc)
val rows = hiveCtx.sql("SELECT name, age FROM users")
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new HiveContext(sc)
val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val result = hiveCtx.sql("SELECT user.name, text FROM tweets")
- JDBC
- Cassandra
- HBase
- ElasticSearch
- 공유변수 - accumulator, broadcast variable
sc.accumulator(initialValue) // deprecated
sc.longAccumulator
sc.longAccumulator("My Acc")
sc.doubleAccumulator
// 사용예 - 빈 라인 갯수 세기
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0)
val callSigns = file.flatMap(line => {
if (line == "") {
blankLines += 1
}
line.split(" ")
})
callSigns.saveAsTextFile("output.txt")
println("Blank lines: " + blankLines.value)
- 태스크 관점에서는 쓰기 전용 변수
- 장애 복구 등으로 태스크가 재실행되면, 트랜스포메이션 내에서는 업데이트가 여러번 발생할 수 있음(v1.3.0). 따라서, 액션 내에서 사용해야함.
- 사용자 정의 어큐뮬레이터를 만들 수 있음
- AccumulatorParam(deprecated) or AccumulatorV2 확장 필요
- 교환 법칙, 결합 법칙이 성립하는 연산이어야 함
// 사용자 정의 어큐뮬레이터 예제
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
private val myVector: MyVector = MyVector.createZeroVector
def reset(): Unit = {
myVector.reset()
}
def add(v: MyVector): Unit = {
myVector.add(v)
}
...
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
sc.broadcast(variable)
- 크고 읽기 전용 변수를 공유하는 방법으로 노드에 한번 씩만 전송(일반 변수는 매 연산마다 전송)
- 직렬화 작업이 튜닝 포인트
- 빠르고 작은 크기의 직렬화 포맷 선택
- 다른 직렬화 라이브러리 사용(spark.serializer 사용)
각 파티션들에서 한 번만 코드를 실행하게 하여 아이템에 대한 셋업 절차의 반복을 피함.
- mapPartitions()
- mapPartitionsWithIndex()
- foreachPartition()
-
유닉스 표준 입출력 스트림
-
RDD 데이터를 표준 입력을 String으로 읽거나 표준 출력으로부터 String으로 쓸 수 있음.
pipe(scriptFile) // 파이프로 전달 sc.addFile(scriptFile) // 사용할 스크립트 파일 리스트. 각 노드에 전달됨
// 사용예
val distScript = "./script/R/finddistance.R"
val distScriptName = "finddistance.R"
sc.addFile(distScript)
val distances = contactsContactLists.values.flatMap(x => x.map(y =>
s"$y.contactlat,$y.contactlong,$y.mylat,$y.mylong")).pipe(Seq(
SparkFiles.get(distScriptName)))
println(distances.collect().toList)
수치 데이터 RDD에 대해서 통계 연산을 제공
stats() // StatsCounter 객체 리턴
- count() : 데이터 갯수
- mean() : 데이터 값 평균
- sum() : 총합
- max() : 최대값
- min() : 최소값
- variance() : 분산
- sampleVariance() : 표본값 분산
- stdev() : 표준편차
- sampleStdev() : 표본값 표준편차
val distanceDouble = distince.map(string => string.toDouble)
val stats = distanceDoubles.stats()
val stddev = stats.stdev
val mean = stats.mean
val reasonableDistances = distanceDoubles.filter(x => math.abs(x-mean) < 3 * stddev)
println(reasonableDistances.collect().toList)
클러스터 운영
마스터(중앙 조정자, 드라이버) - 슬래이브(분산 작업 노드, 익스큐터) 구조
스파크 드라이버 -> 클러스터 매니저(메소스/얀/단독 매니저) -> 클러스터 작업자(익스큐터), ...
- 드라이버 : main() 메소드가 실행되는 프로세스
- 사용자 프로그램을 태스크(스파크 작업 계층에서 가장 작은 단위의 개체)로 변환
- 익스큐터에서 태스크 스케줄 조정 : 드라이버가 되도록 데이터 위치에 기반하여 가장 적절한 익스큐터에서 실행하도록 조정함
- 익스큐터 : 개별 태스크를 실행하는 작업 실행 프로세스
- 작업들을 실행하여 드라이버에 결과 리턴
- RDD 캐시(Block Manager) : 익스큐터 내부에 직접 캐시됨
- 클러스터 매니저
- 익스큐터 및 드라이버 실행
- 사용자 :
spark-submit
사용하여 애플리케이션 제출 -
spark-submit
: 드라이버 프로그램 실행, main() 메소드 호출 - 드라이버 프로그램 : 클러스터 매니저에게 익스큐터 실행을 위한 리소스 요청
- 클러스터 매니저 : 익스큐터 실행
- 드라이버 : 작업을 나눠서 익스큐터에 보낸다
- 익스큐터 : 단위 작업 실행
- main() 종료 or SparkContext.stop() 호출시 익스큐터 종료 및 자원 반환
bin/spark-submit [options] <app jar | python file> [app options]
-
--master
: 접속할 클러스터 매니저-
spark://host:port
: 단독 클러스터. 기본 포트는 7077 -
mesos://host:port
: 메소스 클러스터. 메소스 마스터는 보통 5050 포트 사용. -
yarn
: 얀 클러스터. HADOOP_CONF_DIR 환경 변수에 하둡 설정 디렉토리를 설정해주어야함. -
local
: 로컬 모드에서 싱클 코어로 실행. -
local[N]
: 로컬 모드에서 N개 코어로 실행. -
local[*]
: 로컬 모드에서 머신이 가지고 있는 만큼의 코어로 실행.
-
-
--deploy-mode
: 지역적으로 실행("client") / 클러스터 작업 머신에서 실행("cluster") 선택. 기본은 client -
--class
: main이 있는 클래스 지정 -
--name
: 애플리케이션 이름 지정 -
--jars
: 클래스패스에 있어야할 파일 목록 -
--py-files
: PYTHONPATH에 추가할 파일 목록 -
--executor-memory
: 익스큐터가 사용할 메모리 -
--driver-memory
: 드라이버 프로세스가 사용할 메모리
옵션은 --conf 속성=값
이나 --properties-file
로도 지정 가능
- Maven : uber JAR
- sbt : assembly JAR
- 의존성 충돌 : shading으로 해결
- 클러스터 매니저가 우선순위나 처리 용량 한계에 따른 작업 큐들을 정의하고 스파크는 작업을 큐에 제출함
- 페어 스케줄러 : 장시간 동작 애플리케이션(JDBC 서버 등)을 위한 큐 제공
- 스파크 바이너리를 모든 머신의 동일한 위치에 복사
- 마스터 머신에서 다른 머신에 패스워드 없이 ssh 접속 가능하도록 설정
-
conf/slaves
에 마스터와 작업 노드들의 호스트 이름을 모두 입력 -
sbin/start-all.sh
/sbin/stop-all.sh
spark-submit --master spark://masternode:7077 <app> // 제출
spark-shell --master spark://masternode:7077 // 쉘
- 배포모드
- 클라이언트 모드(Default) : 제출한 머신에서 드라이버 실행
- 클러스터 모드 :
--deploy-mode cluster
- 자원 사용량
- 익스큐터 메모리 :
--executor-memory
- 코어 수 :
--total-executor-cores
/spark.cores.max
- 익스큐터 메모리 :
- 마스터 노드 HA를 위해서는 주키퍼 사용
export HADOOP_CONF_DIR="..."
spark-submit --master yarn <app>
- 배포모드 : 클라이언트 / 클러스터
- 자원 사용량
- 익스큐터 수 :
--num-executors
- 익스큐터 메모리 :
--executor-memory
- 코어 수 :
--executor-cores
- 익스큐터 수 :
spark-submit --master mesos://masternode:5050 <app>
spark-submit --master mesos://zk://node1:2181/mesos,node2:2181/mesos,node3:2181/mesos <app> // zookeeper
- 배포모드 : 클라이언트 모드
- 자원 사용량
- 익스큐터 메모리 :
--executor-memory
- 코어 수 :
--total-executor-cores
- 익스큐터 메모리 :
- 스케줄링 모드
- fine-grained(Default) : 동적으로 CPU 자원 공유
- coarse-grained : 고정된 CPU 개수 할당
최적화 및 디버깅 참고 : 최적화 가이드
val conf = new SparkConf()
conf.set("spark.app.name", "My Spark App")
conf.set("spark.master", "local[4]")
conf.set("spark.ui.port", "36000")
var sc = new SparkContext(conf)
-
설정 우선 순위
- 코드의 SparkConf 객체
- spark-submit 실행시 플래그
- 설정 파일
- 기본값
-
spark-submit
--conf
conf/spark-defaults.conf
--properties-file
- RDD 가계도
rdd.toDebugString
val input = sc.textFile("/path/to/input.txt")
val tokenized = input.filter(line => line.size > 0).map(line => line.split(" "))
val counts = tokenized.map(words => (words(0), 1)).reduceByKey{ (a,b) => a+b }
input.toDebugString
counts.toDebugString
counts.collect()
input: org.apache.spark.rdd.RDD[String] = /Users/ckjun/tmp/input.txt MapPartitionsRDD[251] at textFile at <console>:123
tokenized: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[253] at map at <console>:125
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[255] at reduceByKey at <console>:127
res47: String =
(2) /path/to/input.txt MapPartitionsRDD[251] at textFile at <console>:123 []
| /path/to/input.txt HadoopRDD[250] at textFile at <console>:123 []
res48: String =
(2) ShuffledRDD[255] at reduceByKey at <console>:127 []
+-(2) MapPartitionsRDD[254] at map at <console>:127 []
| MapPartitionsRDD[253] at map at <console>:125 []
| MapPartitionsRDD[252] at filter at <console>:125 []
| /path/to/input.txt MapPartitionsRDD[251] at textFile at <console>:123 []
| /path/to/input.txt HadoopRDD[250] at textFile at <console>:123 []
res49: Array[(String, Int)] = Array((ERROR,1), (INFO,4), (WARN,2))
-
Spark Web UI : 4040 포트
- Jobs : 진행상황, 작업 단계, 태스크 관련
- 성능 이슈 확인(비대칭 성능, 단계별 수행시간)
- Storage : 영속화 RDD 정보
- Executors : 익스큐터 목록
- 자원 사용량
- 익스큐터 별 실패율
- Thread Dump
- Environment : 설정
- Jobs : 진행상황, 작업 단계, 태스크 관련
-
로그
- 병렬화 수준 조정 : repartition() / coalesce()
- 직렬화 포맷 : Kyro 등
- 메모리
- RDD 저장/셔플,집합 연산/사용자 코드 비율 조정
- RDD 캐시 레벨 조정(디스크 사용)
- RDD 캐시 레벨 조정(직렬화 사용) - GC
- 하드웨어
- 많을 수록 좋음
- 익스큐터는 작을 수록 좋음(GC, 64G이하)
스파크 SQL
- DataFrame
- 다양한 구조적 포맷 지원
- DB 연결
- 하이브 의존성 포함 or 미포함
- HiveContext / SQLContext
- 하이브 의존성 포함하더라도 하이브가 필요한 것은 아님.
- HiveQL Manual
- 하이브에 연결하려면
$SPARK_HOME/conf/hive-site.xml
파일 위치해야함. - 기존 하이브가 없는 경우 다음 디렉토리 생성함.
-
metastore_db
: 하이브 메타스토어 -
/user/hive/warehouse
: 테이블
-
// 하이브 의존
import org.apache.spark.sql.hive.HiveContext
// 하이브 의존성 제거시
//import org.apache.spark.sql.SQLContext
// SQL 컨텍스트 생성
val hiveCtx = new HiveContext(sc)
// 내부 변환 모듈 임포트
import hiveCtx.implicits._
// 데이터 읽고 질의하기
val input = hiveCtx.jsonFile("/Users/meye/tmp/testweet.json")
input.registerTempTable("tweets")
val topTweets = hiveCtx.sql("SELECT text,retweetCount FROM tweets ORDER BY retweetCount LIMIT 10")
- 기본연산
- show() : 내용
- select() : 지정한 필드나 함수 결과
- filter() : 조건에 맞는 레코드
- groupBy() : 컬럼에 따라 그룹화
- min(), max(), mean(), agg()
- RDD
- rdd() : RDD로 변환
- getType() : 컬럼을 각 타입별 캐스트(ex. getString(0))
- cacheTable() / cache()
-
CACHE TABLE table
/UNCACHE TABLE table
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new HiveContext(sc)
val rows = hiveCtx.sql("SELECT k,v FROM someTable")
val keys = rows.map(_.getInt(0))
//Load
hiveCtx.load(parquetFile, "parquet")
//Save
val rows = hiveCtx.sql("...")
rows.save("hdfs://...","parquet")
Apache Avro 패키지를 프로젝트에 추가하여 사용
//Load
hiveCtx.load(parquetFile, "com.databricks.spark.avro")
//Save
val rows = hiveCtx.sql("...")
rows.save("hdfs://...","com.databricks.spark.avro")
hiveCtx.jsonFile(inputFile)
- printSchema()
묵시적 변환으로 스키마 추측함.(Scala)
case class HappyPerson(handle: String, favouriteBeverage: String)
val happyPeopleRDD = sc.parallelize(List(HappyPerson("holden", "coffee")))
happyPeopleRDD.registerTempTable("happy_people)
- JDBC 서버 : HiveServer2(
$SPARK_HOME/sbin/start-thriftserver.sh
)./sbin/start-thriftserver.sh --master sparkMaster
- JDBC 클라이언트 : beeline(
$SPARK_HOME/bin/beeline
)./bin/beeline -u jdbc:hive2://localhost:10000
- JDBC 서버를 사용하는 경우, 여러 프로그램 사이에서 캐시된 테이블을 공유할 수 있다.
- 단독 스파크 SQL 쉘 : 로컬에서 SQL을 사용하기 위한 쉘(
./bin/spark-sql
)
- 스파크 SQL UDF
// 문자열 길이 구하기 UDF hiveCtx.register("strLenScala", (_: String).length) val len = hiveCtx.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10")
- 하이브 UDF
- 표준 하이브 UDF는 자동으로 불러와 사용 가능
- 직접 작성한 UDF는 jar 포함시 사용 가능
hiveCtx.sql("CREATE TEMPORARY FUNCTION name AS class.function")
- 여러 컬럼의 개별 합계를 구하는 등의 조건부 집합 연산에 최적화(메모리 기반 컬럼 지향 저장소)
SELECT SUM(user.favouritesCount), SUM(retweetCount), user.id FROM tweets GROUP BY user.id
- 최적화 옵션
- spark.sql.codegen : 쿼리문 실행 시에 바이트코드 컴파일 (false)
- spark.sql.inMemoryColumnarStorage.compressed : 컬럼 지향 포맷을 압축 (true)
- spark.sql.inMemoryColumnarStorage.batchSize : 캐싱 배치 사이즈 (1000)
- spark.sql.parquet.compression.codec : 사용할 압축 코덱. uncompressed,snappy,gzip,lzo (snappy)