# 07장 클러스터에서 운영하기

## 01절 스파크 실행 구조

- local 모드 
    - 하나의 자바 프로세스에서 여러개의 쓰레드로 구동
 

- 분산모드
    - 중앙조정자( 드라이버 ) + 여러개의 분산작업 노드( 익스큐터 )로 구성
    - 드라이버와 익스큐터는 각각 독립된 자바 프로세스 

![](spark01.png)

### 분산모드 구성 요소 - 드라이버 

- 사용자의 main 메소드가 실행되는 프로세스 
- 주요 역할
    - 사용자 프로그램을 테스크로 변환하여 클러스터로 전송
    - 익스큐터에서의 개별 작업을 위한 스케쥴링을 조정

### 드라이버의 역할
- 사용자 프로그램을 테스트로 변환
    - 1) 연산들의 관계를 DAG( Directed Acyclic Graph ) 생성
    - 2) DAG를 물리적인 실행계획으로 변환
        - 최적화를 거쳐 여러 개의 Stage로 변환
        - 각 stage는 여러 개의 테스크를 구성
    - 3) 단위작업들을 묶어서 클러스트로 전송
    
![](spark02.png)
    
- 익스큐터에서 태스크들의 스케쥴링
    - 익스큐터들은 시작시 드라이버에 등록됨
    - 드라이버는 항상 실행중인 익스큐터를 감시
        - 테스크를 데이터 위치에 기반해 적절한 위치에서 실행이 되도록 함.
    - 4040 포트를 사용해셔 웹 인터페이스를 실행 정보를 볼 수 있음.
    
    
![](spark03.png)    

### 분산모드 구성 요소 - 익스큐터

- 개별 태스크를 실행하는 작업 실행 프로세스
- 주요 역할
    - 태스크 실행후 결과를 드라이버로 전송
    - 사용자 프로그램에서 캐시하는 RDD를 저장하기 위한 메모리 공간 제공


####  여러 2개의 python 코드에서  익스큐터에서 실행되는 코드를 찾아보자~~~    

- 첫번째 
```
sc = SparkContext(master, "WordCount") 
lines = sc.parallelize(["pandas", "i like pandas"]) 
result = lines.flatMap(lambda x: x.split(" ")).countByValue() 
for key, value in result.iteritems(): 
    print "%s %i" % (key, value) 
```

- 두번째

```
sc = SparkContext(sparkMaster, appName="ChapterSixExample") 
file = sc.textFile(inputFile) 

def validateSign(sign): 
     global validSignCount, invalidSignCount 
     if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z", sign): 
         validSignCount += 1 
         return True 
     else: 
         invalidSignCount += 1 
         return False 
 
 
validSigns = callSigns.filter(validateSign) 
contactCounts = validSigns.map( 
     lambda sign: (sign, 1)).reduceByKey((lambda x, y: x + y)) 
```    


####  MapReduce의 자바 코드에서 익스큐터에서 실행되는 코드를 찾아보자~~~    
```
public class WordCount {  
 
    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {   
 
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {   
       String line = value.toString();  
       StringTokenizer tokenizer = new StringTokenizer(line);  
       while (tokenizer.hasMoreTokens()) {  
         word.set(tokenizer.nextToken());  
         output.collect(word, one);  
       }  
     }  
   }  
 
    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {   
      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {   
       int sum = 0;  
       while (values.hasNext()) {  
         sum += values.next().get();  
       }  
       output.collect(key, new IntWritable(sum));  
     }  
   }  
 
    public static void main(String[] args) throws Exception {   
      JobConf conf = new JobConf(WordCount.class);   
     conf.setJobName("wordcount");  
 
     conf.setOutputKeyClass(Text.class);  
     conf.setOutputValueClass(IntWritable.class);  
 
     conf.setMapperClass(Map.class);  
     conf.setCombinerClass(Reduce.class);  
     conf.setReducerClass(Reduce.class);  
 
     conf.setInputFormat(TextInputFormat.class);  
     conf.setOutputFormat(TextOutputFormat.class);  
 
     FileInputFormat.setInputPaths(conf, new Path(args[0]));  
     FileOutputFormat.setOutputPath(conf, new Path(args[1]));  
 
     JobClient.runJob(conf);  
   }  
}  
```

### 분산모드 구성 요소 - 클러스터 매니저

- 스파크는 익스큐터를 실행하기 위해 클러스터 매너저에 의존
- 지원하는 매너저 종류
    - standalone(내장매니저), Hadoop Yarn, Apache mesos


### Spark-Submit 

- 클러스터 매니저에 스파크 어플리케이션을 제출하는 툴
    - 예) spark-submit  --master local[*]  my_script.py
    - 예) spark-submit  --master spark://host:7077  my_script.py
    - 예) spark-submit  --master yarn  my_script.py
    - 예) spark-submit  --master mesos://mesosmaster:5050  my_script.py


- 주요옵션
    - master : 클러스터 매니저 설정
    - deploy-mode : 드라이버 프로그램이 실행되는 곳( client / cluster )
    - class : 자바나 스칼라일때 main함수가 들어 있는 클래스
    - jars/py-files : 사용자 어플리케이션에 추가돠어야 할 라이브러리 목록
    - executor-memory : 익스큐터 프로세스가 사용할 메모리
    - driver-memory : 드라이버 프로세스가 사용할 메모리
    
#### deploy-mode
- 클라이언트 모드 
    - 드라이버는 spark-submit의 일부로 실행됨
    - 드라이버 프로그램을 출력을 직접 확인 가능( 표준출력 등)
    - 어플리케이션 실행하는 동안 작업 노드들에 계속 연결되어 있어야 함.


- 클러스터 모드
    - 드라이버가 클러스터내의 작업 노드중에 하나에서 실행됨
    - 실행후 개입하지 않는 방식
    - 파이션 언어는 지원하지 않음.
    

## 02절  의존성 라이브러리의 패키징

- 클러스터 머신들에 해당 라이브러리가 위치해야 함
- 자주 사용하는 것들은 모든 머신들에 직접 설치해놓자.
- python
    - py-files 옵션을 사용하여 라이브러리 제출
    
- java, scala
    - jars 옵션을 사용함.
    - maven, sbt 사용함.

## 03절 스파크 어플리케이션 간의 스케줄링

- 기본적으로 클러스터 매니저의 정책에 의존함.

### 클러스터 매니저

- 공통
    - 익스큐터에서 가능한 많은 코어를 써서 적은 개수 익스큐터로 어플리케이션 실행
    - 분산된 데이터에서의 로컬리티 보장, 데이터가 있는 곳에 테스트 실행
- Standalone
    - 설정 : executor-memory , total-executor-cores
    - 익스코터의 최대 개수 설정 만큼 퍼뜨리는 식으로 동작
- Hadoop Yarn
    - 설정 : executor-memory , executor-cores, num-excuters
- Apache Mesos
    - 설정 : executor-memory , total-executor-cores

### 클러스터 매니저 선택

- Standalone
    - 새로 배포 예정의 어플리케이션일때 <== 설정이 가장 쉬움
- Hadoop Yarn
    - 다른 어플리케이션과 같이 돌리거나 우수한 자원 스케쥴링이 필요한 경우
- Apache Mesos
    - 스파크 쉘과 같은 대화형 어플리케이션 <==  실행간 CPU 사용량을 세밀히 조정 
- HDFS 
    - 저장소와 빠른 접근을 위해서 HDFS와 동일한 노드에 설치

### 간단한 실습

- https://bitbucket.org/mwiewiorka/sparkseq/wiki/Examples

```
spark-shell  --master yarn-client  --executor-memory 2G  --executor-cores 3


import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis
val seqAnalysis = new SparkSeqAnalysis(sc,"file:///home/bigbio/NA18489.chrom20.ILLUMINA.bwa.YRI.exome.20121211.bam",1,1,1)
seqAnalysis.getCoverageBase().filter(p=>(p._2>=10)).count()


```