-
Notifications
You must be signed in to change notification settings - Fork 0
lukas
- RDD는 분산되어 존재하는 변경 불가능한 객체 모음
- RDD는 사용자 정의 클래스를 포함해 파이썬, 자바, 스칼라의 어떤 타입의 객체든 가질 수 있다.
- 두 가지 타입의 연산을 지원
- 트랜스포메이션
- 액션
- 스파크는 RDD를 여유로운 방식(lazy evaluation) 으로 처음 액션을 사용하는 시점에 처리한다.
- 새로운 RDD를 만들어 돌려주는 연산
- 실제로 액션이 사용되는 시점에 계산됨
- RDD는 변경 불가능한것
- map, flatmap, union, intersect, subtract, distinct, cartesian, sample 등이 있음
- 프로그램에 최종 결과 값을 돌려주거나 외부 저장소에 값을 기록하는 연산 작업
- 액션이 실행될 때마다 새로 연산
- 여러 액션에서 RDD 하나를 재사용하고 싶으면 RDD.persist()를 사용하여 계속 결과를 유지하도록 요청
- reduce, fold, aggregate, collect, take, top, takesample 등
- 스파크는 키/값 쌍을 가지고 있는 RDD에 대해 특수한 연산을 제공 : 페어 RDD
- 페어 RDD들은 각 키에 대해 병렬로 처리하거나 네트워크상에서 데이터를 다시 그룹핑하게 해 준다.
파이선/스칼라 map, 자바 mapToPair
- 기본 RDD에서 가능한 모든 트랜스포메이션을 사용
- 페어 RDD의 트랜스포메이션 -- reduceByKey(func), groupByKey, CombineByKey, mapValues(func), flatMapValues(func), keys, values, sortByKey
- 두 페어 RDD에 대한 트랜스포메이션 -- subtractByKey, join, rightOuterJoin, leftOuterJoin, cogroup
-
fold, aggregate, reduce
-
combineByKey
- 한 파티션 내의 데이터들을 하나씩 처리
- 새로운 키라면 createCombiner 함수를 사용해 해당 키에 대한 어큐뮬레이터의 초기값을 만듬
- 중복된 키라면 mergeValue 함수를 사용해 해당 키에 대한 어큐뮬레이터의 현재 값과 새로운 값에 적용해서 합침
- 각 파티션으로부터 결과를 최종적으로 합칠 때 둘 이상의 파티션이 동일 키에 대한 어큐뮬레이터를 갖고 있다면 mergeCombiners를 써서 함침
-
병렬화 수준 최적화
- groupByKey
- RDD의 키를 사용해서 데이터를 그룹화하여 [K, Iterable[V]]타입의 RDD를 리턴
- cogroup
- 여러 RDD에 동일 키를 공유해 데이터를 그룹화, [K, (Iterable[V], Iterable[W])]타입의 RDD를 리턴
- 내부 조인(join), 양쪽 RDD에 모두 존재하는 키만 결과
- leftOuterJoin, 원본 RDD에 있는 각 키의 엔트리들은 가지고 다른 쪽 페어 RDD의 값들은 option으로 표시
- rightOuterJoin, 키가 다른 쪽 RDD에 존재해야 하며 원본 RDD에 option으로 표시
- sortByKey
- countByKey, collectAsMap, lookup
- 노드 간 데이터세트의 파티셔닝을 어떻게 제어할 것인가?
- 분산 프로그램에서 통신은 비용이 매우 크므로 네트워크 부하를 최소화할 수 있는 데이터 배치는 프로그램 성능을 비약적으로 향상
- 파티셔닝은 조인 같은 키 중심의 연산에서 데이터 세트가 여러 번 재활용될 때만 의미가 있다.
- 모든 RDD의 키/값 쌍에 대해 가능
- 시스템이 각 키에 제공된 함수에 따라 값들을 그룹화
- 파일 포맷과 파일 시스템
- NFS, HDFS, 아마존 S3같은 로컬/분산 파일 시스템이 저장한 데이터
- 스파크 SQL을 사용한 구조화된 데이터
- JSON이나 아파치 하이브 같은 구조회된 데이터 소스
- 데이터베이스와 키/값 저장소
- 카산드라, HBase, elasticsearch, JDBC 지원
- 범위 : 구조가 없는 텍스트 파일, 구조화된 JSON, 구조화 포맷인 시퀀스 파일
-
텍스트 파일 불러오기
- 한개 파일을 처리하는 방법
- input = sc.textFile("filepath")
- 여러 개의 파일을 포함한 디렉터리 형태로 들어오는 다중 입력은 두가지 방법으로 처리
- textFile 함수에 디렉터리 경로를 바로 전달해 주면 RDD에 모든 파일을 읽어 드림
- SparkContext.wholeTextFiles() 메소드를 써서 입력 파일명을 키로 하는 페어 RDD를 돌려 받을 수 있음
- 한개 파일을 처리하는 방법
-
텍스트 파일 저장하기 : result.saveASTextFile("outputFile")
- JSON 불러오기 텍스트 파일로 데이터를 로딩하고 JSON으로 파싱
- JSON 저장하기 RDD의 구조화된 데이터를 문자열 RDD로 변환하여 텍스트 파일로 저장
- CSV 불러오기 텍스트 파일로 데이터를 로딩하고 CSV 파싱 라이브러리를 사용하여 변환
- CSV 저장하기 JSON에서처럼 출력 인코딩 객체를 재사용해 처리
키/값 쌍의 비중첩 파일로 구성된 하둡의 파일 포맷 시퀀스 파일들은 동기화 표시를 갖고 있어서 스파크가 그 부분까지 파일 탑색을 했을 때 필요한 레코드 경계까지만 재동기화를 할 수 있게 해줌 여러 개의 노드에서 병렬로 시퀀스 파일들을 효율적으로 읽을 수 있게 해줌
- 시퀀스 파일 불러오기
- 스파크는 시퀀스 파일을 읽기 위한 특화된 API를 가지고 있음
- SparkContext에서 sequenceFile(path, keyClass, valueClass, minPartitions)를 호출
- 시퀀스 파일 저장하기
- 시퀀스 파일은 키/값 쌍으로 이루어지므로 출력을 위한 PairRDD가 필요
- 기본 타입을 쓰려면 그냥 PairRDD를 saveASSequenceFile(PATH)를 호출하기만 하면 데이터를 알아서 파일에 써준다.
- 만약 쓰는 키와 값에 대해 적절한 writable 타입이 존재하지 않거나 가변 길이 타입을 쓰고 싶다면 저장하기 전에 map을 사용하고 데이터를 변환
시퀀스 파일에 대해 단순한 포장을 더해 값만을 가진 RDD를 편법적인 방법으로 저장할 수 있게 함
- 다른 하둡 입력 포맷으로 불러오기
- hadoopFile : 예전 API로 구현된 하둡 입력 포맷
- newAPIHadoopFile : 새로운 하둡 API로 파일을 읽기
- 하둡 출력 포맷으로 저장하기
- saveAsHadoopFile
- 파일 시스템 외의 데이터 소스
- hadoopDataset/saveAsHadoopDataset, newAPIHadoopDataset/saveAsNewAPIHadoopDataset
- 하둡이 지원하는 파일 시스템이 아닌 저장소 포맷에 접근 가능, HBase,몽고DB 같은 종류의 키/값 저장소
디스크 공간과 네트워크 부하를 줄이기 위해 데이터를 압축할 필요가 있음,
대부분의 하둡 출력 포맷에는 데이터를 압축하는 데 쓸 압축 코덱을 지정할 수 있음
스파크는 로컬 파일 시스템에서 파일 불러오기를 지원하지만 파일들이 클러스터의 모든 노드에서 동일 경로에 있기를 요구
s3n://bucket/my-files/* 과 같은 형식으로 지원
하둡 분산 파일 시스템은 스파크가 잘 돌아가는 분산파일 시스템 hdfs://master:port/path의 식으로 지정
아파치 하이브가 지원하는 어떤 테이블이든 읽은 수 있음
JSON 데이터가 레코드마다 일관된 스키마를 갖고 있다면 스파크 SQL은 스키마를 예상하여 각 레코드를 불러올 수 있음
필요한 필드 데이터만 뽑아 오는 것을 쉽게 만들어 줄 수 있음
JDBC가 지원하는 어떤 관계형 DB의 데이터든 불러오기가 가능 jdbcRDD를 생성한 후 SparkContext와 다른 인자들을 전달해 주면 됨 jdbcRDD
- 데이터 베이스에 접속할 수 있는 함수를 제공
- 원하는 범위의 데이터를 읽을 질의와 함께 질의에 대한 속성으로 lowerBound와 upperBound를 넣어 준다.
- 각 레코드의 출력을 ResultSet에서 데이터를 처리하기 편한 포맷으로 바꿔 주기 위한 함수
커넥터가 스파크에 포함되어 있지 않으므로 쓰기 위해서는 빌드 파일에 추가 의존성을 설정해 줘야 한다. 카산드라는 아직 스파크 SQL을 쓰지는 않지만 CassandraRow객체를 포함한 RDD를 리턴해 주며 이는 스파크 SQL의 Row 객체와 동일한 메소드들을 갖고 있다.
TableInputFormat을 구현한 하둡 입력 포맷을 써서 HBase에 접근
스파크의 공유 변수인 어큐뮬레이터와 브로드캐스트 변수는 일반적인 통신 타입인 결과의 집합 연산과 브로드캐스팅에 대해 이런 제한을 풀어준다. 작업 노드에서 드라이버 프로그램으로 보내는 값의 집합 연산에 대해 간단한 문법을 제공
- 드라이버에서 SparkContext.accumulator(initialValue) 메소드를 호출하여 초기 값을 가진 어큐뮬레이터를 만든다. 반환 타입은 org.apache.spark.Accumulator[T] 객체이며 T는 초기값의 타입이다.
- 스파크 클로저의 작업 노드 코드에서 어큐뮬레이터에 += 메소드를 써서 값을 더한다.
- 드라이버 프로그램에서 value 속성을 불러 어큐뮬레이터의 값에 접근한다.
작업 노드의 태스크는 어큐뮬레이터의 value에 접근할 수 없다. 이 태스크의 관점에서 어큐뮬레이터는 쓰기 전용 변수인 셈
- 자동적으로 오류가 발생하거나 느려진 머신들에 대해 오류가 발생하거나 느려진 작업들을 재실행하는 방식으로 대응
- 액션에 사용되었던 어큐뮬레이터들에 대한 것이며 각 태스크의 업데이트는 스파크에 의해 각 어큐뮬레이터에 한 번씩만 반영된다.
- 액션이 아닌 RDD 트랜스포메이션에 사용되는 어큐뮬레이터에 대해서는 이런 보장을 할 수가 없다.
사용자 지정 어큐뮬레이터 사용자 정의 어큐뮬레이터는 AccumuatorParam을 확장해야 함.
스파크 연산에 쓸 크고 읽기 전용인 값을 모든 작업 노드에 효과적으로 전송하는데 쓴다.
- 기본적인 작업 실행 메커니즘이 작은 작업 사이즈에 최적화
- 병렬 작업에서 동일 변수를 사용할 수 있으므로 효과적이지 못한 면도 있음
직렬화를 최적화하려면 spark.serializer 속성을 써서 다른 직렬화 라이브러리를 선택하거나 사용하는 데이터 타입에 맞게 직접 구현
각 데이터 아이템에 대해 셋업 절차의 반복을 피하게 해준다. 스파크는 파티션 기반 버전의 map과 foreach를 제공하여 RDD의 각 파티션들에서 한 번만 코드를 실행하게 해 줌으로써 그런 작업들에 대한 비용을 줄여준다.
- mapPartitions : f(Iterator[T]) -> Iterator[T]
- mapPartitionsWithIndex : f(Int, Iterator[T]) -> Iterator[U]
- foreachPartition : f(Iterator[T]) -> Unit
스파크는 pipe 메소드를 통해 유닉스 표준 입출력 스트림으로 읽고 쓸 수 있는 모든 언어로 작업의 일부를 작성할 수 있다. pipe를 쓰면 각 RDD의 데이터를 표준 입력으로부터 String으로 읽어 들이는 트랜스포메이션을 만들수 있고 이 문자열을 원하는 대로 가겅하여 마찬가지로 표준 출력으로부터 결과를 String으로 써 줄 수 있다.
count : RDD가 갖고 있는 데이터 개수 mean : 데이터 값들의 평균 sun : 총합 max : 최대값 min : 최소값 variance : 값들의 분산 sampleVariance : 표본값에 대한 분산 stdev : 표준 편차 smapleStdev : 표본값에 대한 표준 편차
하나의 중앙 조정자와 여러 개의 분산 작업 노드로 구성되는 마스터/슬레이브 구조를 사용한다. 중앙 조정자는 드라이버라고 부른다. 드라이버는 익스큐터라고 불리는 다수의 분산 작업자들과 통신한다. 드라이버는 자신만의 자바 프로세스에서 돌아가며 각 익스큐터 또한 독립된 자바 프로세스이다. 하나의 드라이버와 익스큐터들을 합쳐서 스파크 애플리케이션이라고 부른다.
- 프로그램의 main() 메소드가 실행되는 프로세스
- SparkContext를 생성하고 RDD를 만들고 트랜스포메이션과 액션을 실행하는 사용자 코드를 실행하는 프로세스
드라이버가 실행될 때 드라이버는 두 가지 임무를 수행한다.
- 사용자 프로그램을 태스크로 변환
- 사용자 프로그램을 물리적 실행 단위가 되는 태스크로 바꿀 책임을 갖는다.
- 입력으로부터 RDD를 만들고 트랜스포메이션을 사용하여 새로운 RDD를 받아오며 데이터를 가져오거나 저장하기 위해 액션을 사용
- 익스큐터에서 태스크들의 스케줄링
- 물리적 실행 계획이 주어지면 스파크 드라이버는 익스큐터들에서의 개별 작업들을 위한 스케줄을 조정
- 익스큐터들은 시작하면서 드라이버에 등록을 하게 되므로 항상 애플리케이션의 실행에 대해 전체적으로 볼 수 있다.
- 주어진 스파크 작업의 개별 태스크들을 실행하는 작업 실행 프로세스
- 애플리케이션을 구성하는 작업들을 싱행하여 드라이버에 그 결과를 돌려준다.
- 각 익스큐터 안에 존재하는 블록 매니저라는 서비스를 통해 사용자 프로그램에서 캐시하는 RDD를 저장하기 위한 메모리 저장소를 제공
스파크는 익스큐터를 실행하거나 때로는 드라이버 실행을 위해서도 클러스터 매니저에 의존한다.
스파크는 사용자 프로그램을 스파크에 제출 할 수 있는 단일 스크립트인 spark-submit을 제공 다른 클러스터 매니저들에 접속할 수 있으며 사용자의 애플리케이션이 얼마나 많은 자원을 쓸지 조정할 수 있다.
스파크는 spark-submit이라는 모든 클러스터 매니저 간에 작업을 제출해 주는 단일 툴을 제공 spark-submit이 옵션 없이 스크립트 이름 혹은 jar 파일 이름만으로 호출된다면 단순히 로컬로 스파크 프로그램을 실행한다. -master 플래그는 접속할 클러스터의 주소를 지정
애플리케이션 실행에 대해 세밀한 설정을 할 수 있도록 다양한 옵션을 제공
- 작업에 원하는 만큼의 자원량 등을 설정하는 스케줄링 정보에 대한 것
- 모든 작업 머신에 배포하기 원하는 파일이나 라이브러리 같은 애플리케이션의 실행 의존성에 대한 정보
spark-submit [option] <app jar | python file> [app options]
sparkConf의 설정을 --conf 속성=값 스타일이나 각 속성의 키와 값을 포함한 파일을 --properties-file을 써서 지정해 주는 식으로 전달 가능하
서드파티 라이브러리를 설치하는 방법
- 는 작업 머신에 설치된 파이썬을 이용하므로 필요한 라이브러리들을 클러스터 머신들에 기본 파이썬 패키지 매니저를 써서 직접 설치하거나 수동으로 파이썬 설치 디렉터리의 site-package/ 밑에 넣어 줄 수 있다.
- spark-submit의 인자로 --py-files를 써서 개별 라이브러리를 제출하면 그것들이 파이썬 인터프리터 경로에 추가됨
- spark-submit에 --jars 플래그로 개별 JAR파일을 제출할 수 있음
- 전체적인 전이 의존성 그래프를 같이 전송, 직접적인 의존성 뿐만 아니라 그 라이브러리의 의존성, 의존성의 의존성 등등을 모두 포함
- 빌드 도구는 메이븐과 sbt, 양쪽 언어에 다 쓸 수 있지만 메이븐은 주로 자바를 위해, sbt는 스칼라 프로젝트를 위해 쓴다.
다중 접속 환경 클러스터에서의 스파크는 기본적으로 클러스터 매니저가 스파크 애플리케이션들 사이에서 자원을 공유/관리해 주는 기능에 의존하여 스케줄링 스파크 애플리케이션이 클러스터 매니저로부터 익스큐터를 요청하면 가능한 상태나 자원 경쟁 여부에 따라 적당한 개수의 익스큐터가 주어짐
몇 대 정도의 머신 위에서 스파크 자체만 돌린다면 내장된 단독 모드가 설정하기 가장 쉬운 방법 다른 분산 애플리케이션과 공유해서 쓰는 클러스터를 사용한다면 스파크는 두 가지 인기 클러스터 매니저와도 운영 가능
스파크의 단독 클러스터 매니저는 클러스터에서 애플리케이션을 실행하는 간단한 방법을 제공, 이는 하나의 mastor와 여러개의 작업자로 구성
- 직접 마스터와 작업자를 실행하거나 스파크의 sbin디렉터리의 실행 스크립트를 써서 시작, 실행 스크립트는 쓰기에 가장 쉽지만, 머신 간에 SSH 접속 설정이 필요하며 현재는 맥 OS X와 리눅스에서만 가능
- 애플리케이션을 단독 클러스터 매니저에 제출하려면 spark-submit의 마스터 옵션을 사용
- spark-shell이나 pyspark를 실행할 때도 --master 인자를 써서 동일한 방법으로 시작할 수 있다.
- 단독 클러스터 매니저는 애플리케이션을 실행하는 드라이버가 어디에서 실행되는지에 대해 두 가지의 배포 모드를 지원
- 클라이언트 모드에서 드라이버는 spark-submit을 실행하는 머신에서 spark-submit의 일부로 실행
- 클러스터 모드에서는 드라이버가 단독 클러스터 내에서 작업 노드들 중 하나에서 별개의 프로세스로 실행된 후 익스큐터 요청을 위해 다시 연결
- 스파크를 여러 애플리케이션이 공유해서 사용할 때 익스큐터들 사이에 자원을 얼마나 할당할지 결정할 필요가 있다.
- 익스큐터 메모리 : spark-submit 실행 시 --executor-memory를 써서 이를 지정, 기본 1GB
- 코어 수의 최대 총합 : spark-submit에 --total-executor-cores, 기본 설정은 무한대
- 아파치 주키퍼를 써서 여러 개의 대기 마스터 노드들을 유지하다가 문제가 생기면 그것들 중 하나로 대체하게 해 준다.
다양한 데이터 처리 프레임워크들이 공유된 자원 풀을 쓸 수 있도록 하기 위해 하둡 2.0에서 처음 소개되었으며 대개 HDFS와 함께 설치 하둡 설정 디렉터리를 가리키도록 환경 변수를 설정하고 spark-submit에 특별한 마스터 URL로 작업을 제출하면 된다.
- 익스큐터 개수 : --num-executors 플래그로 지정해 준 개수의 익스큐터만을 사용, 기본 설정 2
- 메모리 / 코어 수 : --executor-memory를 써서 메모리 지정, --executor-cores를 사용하여 코어 개수를 설정
- 클러스터에서 분석 워크로드나 장시간 동작 서비스 모두 사용할 수 있게 해 주는 범용 목적의 클러스터 매니저 spark-submit 에 mesos://로 시작하는 URI를 전달
- 주키퍼를 써서 멀티 마스터 모드에서 주키퍼가 마스터를 고르도록 메소스 클러스터를 설정할 수 있음 mesos://zk://로 시작하는 URI를 써서 주키퍼 노드들의 목록을 명시해 주도록 한다.
메소스는 동일 클러스터 내에서 자원을 공유할 수 있는 두가지 모드를 지원 fine-grained 모드는 기본 모드이며 실행하는 단위 작업들에 따라 익스큐터가 메소스에 요청하는 CPU 개수를 조정하므로 여러 개의 익스큐터를 한 머신에서 실행하더라도 익스큐터들 사이에 동적으로 자원을 공유할 수 있다. coarse-grained 모드는 각 익스큐터들 고정된 개수의 CPU를 할당하고 애플리케이션이 종료하기 전까지 심지어 익스큐터가 아무 일도 하지 않아도 결코 반환하지 않는다. spark-submit을 실행할 때 --conf spark.mesos.coarse=ture를 전달해서 거친 모드를 활성화
메소스에서 스파크를 실행할 때도 애플리케이션을 오직 클라이언트 배포 모드에서만 실행하는 것을 지원 메소스 클러스터 내부에서 실행하고 싶다면 오로라나 크로노스 같은 프레임워크가 임의의 스크립트를 메소스 안에서 실행하고 모니터링할 수있게 해준다.
- 익스큐터 메모리 : spark-submit 실행 시 --executor-memory를 써서 이를 지정
- 코어 수의 최대 총합 : spark-submit에 --total-executor-cores
스파크에는 아마존 EC2에서 클러스터를 실행할 수 있는 내장 스크립트가 들어있다. HDFS, 타키온, 클러스터를 모니터링하기 위한 강글리아 등의 지원 서비스도 설정
접속키 ID와 비밀 접속키를 설정, EC2 ssh 키 쌍을 만들고 개인 키 파일을 다운로드하여 ssh 접속이 가능하도록 한다. spark-ec2 스크립트의 launch 명령을 실행하면서 키 쌍 이름, 개인 키 파일, 클러스터 이름을 지정
- 스파크의 주된 설정 메커니즘은 sparkConf클래스
- 사용자가 재정의해서 쓸 수 있는 설정 옵션들에 대한 키와 값의 쌍들을 갖고 있음
- spark-submit도구를 써서 동적으로 설정값을 지정할 수 있음, 애플리케이션이 spark-submit으로 설행될 때 설정값들은 그 실행환경에 주입
- --conf 플래그도 제공,
- spark-submit은 설정값을 파일에서 읽는 것도 지원, 기본적으로 conf/spark-defaults.conf를 사용 --properties-file을 써서 변경 가능 간혹 동일한 설정 속성값이 여러 곳에 지정되는 경우 사용자 코드에서 sparkConf객체 설정 > spark-submit에 전달되는 플래그 > 설정 파일 > 기본값
- 데이터를 셔플하는 데에 쓸 로컬 저장 디렉터리를 지정하기 위해서는 SPARK_LOCAL_DIRS 환경 변수를 conf/spark-env.sh안에 경로들을 export해야 함
스파크는 실행하면서 논리적 표현들을 여러 개의 연산들을 태스트로 합쳐서 물리적인 실행 계획으로 바꾼다.
val input = sc.textFile("input.txt")
val tokenized = input.filter(line => line.size > 0 ).map(line => line.split(" "))
val counts = tokenized.map(wards => (words(0), 1).reduceByKey{ (a,b) => a+b }
이 명령어들의 결과 RDD인 counts는 각 로그 레벨 메시지의 개수를 갖고 있게 된다. 이를 실행하고 나면 프로그램은 어떤 액션도 수행하지 않는다. 대신 내부적으로 정의된 RDD 객체들의 지향성 비순환 그래프를 갖게 되고 이것이 나중에 액션을 수행할 때 쓰이게 된다.
스파크의 스케줄러는 액션을 수행할 때 필요한 RDD 연산의 물리적 실행 계획을 만든다. RDD에 collect()를 호출할 때 RDD의 모든 파티션이 실체화되고 드라이버 프로그램으로 전송된다. 스파크의 스케줄러는 연산되는 마지막 RDD에서 시작하여 연산해야 할 것을 역으로 추적해 나간다.
jobs: 진행 상황과 작업 단계, 태스크 등에 대한 수치들 제공 storage: 영속화된 RDD의 정보 Executors : 애플리케이션에 존재하는 익스큐터 목록 Environment: 스파크 설정 디버깅
스파크 로그의 위치 단독 모드 : 마스터 웹 UI에 직접 표시, 기본적으로 각 작업 노드의 스파크 설치 위치의 work/아래 저장 메소스 : 메소스 슬레이브 노드의 work/밑에 저장, 메소스 마스터 UI 얀 모드 : 애플리케이션에서 모든 로그에 대한 보고서를 생성하도록 얀의 로그 수집 도구를 사용하는 것
자체적으로 사용자 개입 없이 스파크는 RDD의 병렬화 수준이 적당한지에 대해 추론해 동작, 이는 대부분의 경우에 잘 동작
- 병렬화 개수가 너무 적으면 스파크가 리소스들을 놀리게 되는 경우가 발생
- 병렬화 개수가 너무 많다면 각 파티션에서의 작은 오버헤드라도 누적되면서 성능 문제가 심각해질 것
병렬화 수준을 조정할 수 있는 두가지 방법
- 데이터 셔플이 필요한 연산 간에 생성되는 RDD를 위한 병렬화 정도를 인자로 줄 수 있음
- 이미 존재하는 RDD를 더 적거나 더 많은 파티션을 갖도록 재배치할 수 있음
네트워크로 데이터를 전송하거나 디스크에 쓸 때 객체들을 직렬화해 바이너리 포맷으로 변환, 기본적으로 자바에 내장된 직렬화를 사용 카이로 : 더 빠르면서도 더 간편한 바이너리 포맷을 지니고 있지만 모든 타입의 객체가 자동적으로 직렬화 되지는 않음
- RDD 저장용 : persist, cache를 호출할 때 그 파티션들은 메모리 버퍼에 저장
- 셔플 및 집합 연산 버퍼 : 셔플 연산 수행 시에 셔플 출력 데이터를 저장하는 중간 버퍼를 만듬, 집합 연산의 중간 결과를 저장하거나 셔플 결과의 일부분으로 출력될 데이터를 저장하는 용도로 사용
- 사용자코드 : 임의의 사용자 코드를 실행하게 되므로 사용자 코드 자체도 상당한 양의 메모리를 쓰게 될 수 있음
구조화된 데이터를 다루는 스파크 인터페이스인 스파크 SQL
- 파이썬, 자파, 스칼라에서 DataFrame 추상화 클래스를 제공, DataFrame은 관계형 DB의 테이블과 유사한 개념
- 다양한 구조적 포맷의 데이터를 읽고 쓸 수 있다(json, hive 등)
- 스파크 프로그램 내부에서나 표준 데이터베이스 연결을 제공하는 외부툴을 써서 스파크 SQL을 통해 데이터를 질의
내부적으로 스파크SQL은 데이터프레임이라 불리는 RDD의 확장 모델에 기반
- 하이브 테이블이나 UDF, SerDes, 하이브 QL 등에 접근
HiveQL과 다른 하이브 기반 기능을 사용할 수 있는 HiveContext, 기존의 하이브 설치본을 필요로 하지 않음
추가적인 import 구문 필요 : 필요한 타입 정보를 가진 RDD를 스파크 SQL에 특화된 RDD로 변환해 질의를 요청하는 데에 필요 ` import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext ` HiveContext 혹은 하이브 의존성이 필요없는 경우 SQLContext를 만들어야 함
테이블에 대한 쿼리를 만들려면 HiveContext나 SQLContext에서 sql()을 호출 첫 번째로 필요한 것은 스파크 SQL에 어떤 데이터에 관해 질의할지를 알려주는 것 ` input = hiveCtx.jsonFile(inputFile)
input.registerTempTable("tweets")
topTweets = hiveCtx.sql(""" select text, retweetCount fromtweets order by retweetCount limit 10""") `
- 데이터를 읽어 들이는 것과 쿼리를 실행하는 것은 모두 결과로 데이터 프레임을 되돌려 줌
- 데이터 프레임은 전통적인 데이터베이스의 테이블과 비슷
- 데이터 프레임은 RDD에 접근할 수 있는 방법도 제공하므로( rdd() 호출 ) 기존의 map()이나 filter()를 쓴 연산도 가능
- 임시 테이블로 등록하여 hiveContext.sql이나 SQLContext.sql로 질의를 날릴 수 있다는 점
- row객체는 데이터프레임 내부에서 레코드를 의미하고, 단순히 필드들의 고정 길이 배열
- row객체는 인덱스가 주어진 필드의 값을 가져오기 위한 여러개의 게터 함수를 갖고 있음
- Object 타입을 되돌려 주무로 사용자가 직접 맞는 타입으로 캐스팅해야 함
- 스파크 SQL에서의 캐싱은 약간 다르게 동작
- 전체 객체를 저장하는 방식보다는 메모리를 효율적인 형태로 캐싱하도록 확실히 하기 위해서는 hiveCtx.cacheTable("tableName") 메소드를 사용 캐싱된 테이블은 메모리에 드라이버 프로그램이 실행되는 동안만 존재하므로 프로그램이 끝나면 새로 캐시해야 함
- 지원 타입 : 데이터를 하이브에서 읽어 올 때 스파크 SQL은 텍스트 파일. RC파일 ORC, 파케이, 에이브로, 프로토콜 버퍼 설정 : hive.site.xml 파일을 스파크의 conf/ 디렉터리에 복사해 주면 됨
- 하이브에서 쓰는 범위 이상의 데이터 소스들을 지원하기 위해 스파크SQL은 연결하기 쉬운 형태의 데이터 소스 API를 제공
- 동일한 스키마로 맞춰진 레코드들의 JSON 파일을 갖고 있다면 스파크 SQL은 파일을 스캔해 스키마를 추정하고 필드에 이름으로 접근할 수 있게 해준다.
- 케이스 클래스의 RDD가 내부적으로 DataFrame으로 변환
- BI 도구 등에서 스파크 클러스터에 접속하거나 클러스터를 다양한 사용자가 쓰는 것에 도움이 되도록 JDBC 기능을 제공
- 테이블을 만들고 목록을 보고 쿼리를 날려 보는 하이브 QL 명령어들을 실행해 볼 수 있음
- 스파크 SQL의 JDBC 서버를 써서 얻는 이점 중의 하나는 여러 프로그램들 사이에서 캐시된 테이블을 공유할 수 있다는 것
- 사용자 정의 함수는 파이썬, 스칼라, 자바 등으로 직접 만든 로직의 함수를 등록하여 SQL 내에서 호출할 수 있게 해준다.
- 스파크 SQL은 쓰고 있는 프로그래밍 언어에서 함수만 전달해서 쉽게 UDF를 등록할 수 있는 내장 메소드를 지원
- 표준 하이브 UDF들은 자동으로 불러와 사용할 수 있음
- 직접 작성한 UDF가 있다면 UDF의 jar 파일들이 애플리케이션에 포함되어 있는 지가 중요
- 스파크 SQL은 단순히 SQL에 친숙한 사용자들을 위한 것 이상의 기능을 제공하기 위한 컴포넌트
- spark.sql.codegen
스파크 SQL이 각 쿼리를 실행 전에 자바 바이트 코드로 컴파일
오래 걸리는 쿼리들이나 반복적인 쿼리들을 상당히 빠르게 실행시키게 해주나 매 쿼리마다 실행하는 컴파일러 때문에 추가적인 부담이 더해짐