# Spark

* Spark Core

* Spark Core 扩展

* SparkSQL

* SparkStreaming

Spark-Core是Spark最核心的包，是整个Spark的基础，提供了分布式任务调度和基本的I/O功能。

Spark提供了批处理(RDDs)，结构化查询(SparkSQL   DataFrame)，流计算(SparkStreaming)，机器学习(MLlib)，图计算(GraphzX)等组件，这些组件都是依托于通用的计算引擎RDDs，所以Spark-Core的RDDs是整个Spark的基础。


特点：

* 速度快，在内存运行比MapReduce快100倍，在硬盘运行比MapReduce快10倍

* 可以兼容现有大多数集群工具，YARN，Mesos，Kubernets，Spark Standalone，可以访问HBase，Hive，HDFS，Cassndra等

* 提供80多种运算符

* 提供全栈服务，并且支持JAVA，Scala，Python，R，SQL等语言的API


Spark可以将程序运行在两种模式下：

* 单机，通过线程模拟并行来运行程序

* 集群，使用集群管理工具将程序运行在集群上


## Spark 集群

Spark如何将一个程序运行在一个集群上？

要想管理数以千计的机器集群，有必要使用集群管理工具，比如Yarn，Mesos，Kubernets，Standalone。

流程是，首先使用Spark的Client提交任务给集群管理工具申请资源，然后将计算任务分发到集群中运行。

Client --> Cluster Manager --> Cluster

Client 提交程序给Driver program, 创建SparkContext,该进程调用Spark程序的main方法，并且启动SparkContext

Cluser 包含许多Woker Node进程，Worker是一个守护进程，负责启动和管理Executor, Executor负责运行Spark Task

总结即Driver将程序转化为Tasks，并提交给Executor执行  


Driver 和 Worker在什么时候启动？在不同的集群管理工具中表现不同。

* Standalone

  Standalone集群中分为两个角色，Master和Slave(Worker)，在Standalone集群中，启动之初就会创建固定数量的Worker。Driver的启动分为两种模式，Client和Cluster。在Client模式中，Driver运行在Client端，在Client启动时被启动；在Cluster模式中，Driver运行在某个Worker中，随应用的提交而启动。 

* Yarn

  在Yarn集群中也分为Client和Cluster模式。Cluster模式中，首先会和ResourceManager交互，开启ApplicationMaster，其中运行了Driver，Driver创建基础环境后，会由RM提供对应的容器，运行Executor，Executor会反向向Driver注册自己，并申请Tasks执行。 



## Spark集群环境配置

 在清华国内源下载了spark-3.1.1

 1. 安装：

 ```
 cd /usr/local/softwares/
 rz -E
 tar -xzvf spark-3.1.1 -C ../servers/
 mv spark-3.1.1 spark-3.1.1
 ```

 2. 配置 JAVA 路径， Master 路径：

 ```shell
 cd spark-3.1.1
 ls
 cd conf
 mv spark-env.sh.template spark-env.sh
 vim spark-env.sh
 ```

 ​	添加以下：

 ```
 #指定 JAVA Home
 export JAVA_HOME=/usr/local/servers/jdk1.8.0_271
 #指定 Spark Master 地址
 export SPARK_MASTER_HOST=node01
 export SPARK_MASTER_PORT=7077
 ```

 ​	配置slaves：

 ​	指定从节点，从而使用`sbin/start-all.sh`启动集群的时候，可以一键启动整个集群所有的worker

 ```shell
 cd /usr/local/servers/spark-3.1.1
 ls
 cd conf
 cp workers.template workers
 vim workers
 ```

 ```
 node01
 node02
 node03
 ```

 ​	配置 HistoryServer：

 ​	默认情况下，Spark程序运行完毕后，就无法查看运行记录的Web UI了，通过配置HistoryServer可以提供一个服务，通过读取日志文件，使得我们在程序运行结束后，依然能够查看运行过程。

 ```shell
 cd /usr/local/servers/spark-3.1.1
 ls
 cd conf
 cp spark-defaults.conf.template spark-defaults.conf
 vim spark-defaults.conf
 ```

 在最后添加以下：

     ```
     spark.eventLog.enabled true
     spark.eventLog.dir hdfs://node01:8020/spark_log
     spark.eventLog.compress true
     ```

     在spark-env.sh中添加HistoryServer启动参数

     ```
     # 指定 Spark History 运行参数
     export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://node01:8020/spark_log"

     ```

 ​	TODO 为Spark创建HDFS中的日志目录

     ```shell
     hdfs dfs -mkdir -p /spark_log
     ```

 3. 分发

    将Spark安装包分发给集群中其它机器

    ```
    cd /usr/local/servers/
    scp -r spark-3.1.1 root@node02:$PWD
    scp -r spark-3.1.1 root@node03:$PWD
    ```

 4. 启动Spark Master 和 Slaves，以及 HistoryServer

    ```shell
    cd /usr/loca/servers/spark-3.1.1
    sbin/start-all.sh
    sbin/start-history-server.sh
    ```

    查看进程

    ```
    jps
    ```

    在unix系统中，ps可以查看进程状态process status，有哪些进程以及进程id。

    jps是java提供的一个可以查看所有java进程pid的命令。

**系统环境变量配置 ~/.bash_profile**


```
# Java11 path
export JAVA_11_HOME=/usr/local/java/jdk-11.0.2.jdk/Contents/Home
# Java8 path
export JAVA_8_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_271.jdk/Contents/Home
export JAVA_HOME=$JAVA_11_HOME
alias jdk11="export JAVA_HOME=$JAVA_11_HOME"
alias jdk8="export JAVA_HOME=$JAVA_8_HOME"
export PATH=$JAVA_HOME/bin:$PATH:.
# Maven
export M3_HOME=/usr/local/maven/apache-maven-3.6.3
export PATH=$M3_HOME/bin:$PATH
# jenv
export PATH="$HOME/.jenv/bin:$PATH"
eval "$(jenv init -)"

#Spark path
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip:$PYTHONPATH
export PYSPARK_PYTHON=python3
```

## Spark集群高可用搭建

使用Zookeeper实现Spark Standalone高可用：

对于Spark Standalone集群来说，当worker调度出现问题时，会自动的弹性容错，将出错的Task调度到其它woker执行。但是对于Master来说，可能会出现单点失败问题。Spark提供了两种方式满足高可用：

- 使用Zookeeper实现Masters的主备切换
- 使用文件系统做主备切换，此类场景很小

1. 停止Spark集群

```
cd /usr/local/severs/spark-3.1.1/
ls
cd sbin
stop-all.sh
```

2. 修改配置文件，指定Zookeeper路径，增加Spark运行时参数

```
cd /usr/local/severs/spark-3.1.1/
ls
cd conf
vim spark-env.sh
```

注释掉SPARK_MASTER_HOST地址这一行, 添加Spark启动参数

```
# export SPARK_MASTER_HOST=node01

# 指定 spark 运行时参数
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181 -Dspark.deploy.zookeeper.dir=/spark"
```

3. 分发配置文件到整个集群

```
cd /usr/local/severs/spark-3.1.1/conf
scp spark-env.sh node02:$PWD
scp spark-env.sh node03:$PWD
```

4. 启动

在node01上启动整个集群

```
cd /usr/loca/servers/spark-3.1.1
sbin/start-all.sh
sbin/start-history-server.sh
```

在node02上单独再启动一个Master

```
cd /usr/loca/servers/spark-3.1.1
sbin/start-master.sh
```

5. 查看node01 master 和 node02 master 的 Web UI

一个为Alive(主),一个为Standby(备)

Spark各服务端口：

Master WebUI port: node01:8080

Worker WebUI port: node01:8081

History Server port: node01:4000


## Spark程序运行

1. 进入spark安装目录

```
cd /usr/local/severs/spark-3.1.1/
```

2. 运行spark示例任务

```
bin/spark-submit \   #spark-submit 提交命令
--class org.apache.spark.examples.SparkPi \    #指定jar包中运行的类
--master spark://node01:7077,node02:7077,node03:7077 \    #指定集群master地址，因为已经搭建了高可用，把几个节点都写上
--executor-memory 1G \
--total-executor-core 2 \
/usr/local/severs/spark-3.1.1/examples/jars/spark-examples_2.11-2.2.3.jar \    #指定jar包的位置
100  #指定运行的函数参数
```


```
bin/pyspark --master spark://node01:7077
```

```
bin/spark-shell --master spark://node01:7077
```


编写spark程序的两种方式：

1. spark-shell 

    适用于数据集的探索，测试
    
    spark-shell 的原理是把每一行scala代码编译成类，最后交由spark执行
    
    启动spark-shell：
    
    进入spark安装目录，执行`spark-shell --master master`就可以提交Spark任务
    
    master地址的设置：

    | master地址        | 说明                                                         |
    | ----------------- | ------------------------------------------------------------ |
    | local[N]          | 使用N条worker线程在本地运行                                  |
    | spark://host:port | 在spark standalone中运行，指定spark集群的master地址，端口默认为7077 |
    | mesos://host:port | 在apache mesos中运行，指定mesos地址                          |
    | yarn              | 在yarn中运行，yarn地址由环境变量HADOOP_HADOOP_CONF_DIR指定   |
    
    
**读取本地文件**

```
rdd = sc.textFile("file:///usr/local/wordcount.txt")
```

**读取HDFS文件三种方式**

```
rdd = sc.textFile("hdfs://node01:8020/dataset/wordcount.txt")

rdd = sc.textFile("hdfs:///dataset/wordcount.txt")

rdd = sc.textFile("/dataset/wordcount.txt")
```


2. spark-submit

    适用于独立应用，上线集群 运行
    
    
**独立应用运行的两种方式**

1. 本地运行

2. 提交运行

   修改代码 -> 打包上传 -> bin/spark-submit 集群运行

## RDD

RDD是一个弹性分布式数据集 

RDD 特点：

* RDD是数据集

* RDD是编程模型

* RDD之间可以相互依赖

* RDD是可以分区的

 
SparkContext是Spark-Core的入口组件，是Spark程序的入口。如果把Spark程序分为前后端，服务端就是运行Spark程序的集群，前端就是Driver，在Driver中，SparkContext是最主要的组件，是Driver的核心，Driver运行时首先创建的组件。

SparkContext主要作用是连接集群，创建RDD，累加器，广播变量等。 

创建RDD的三种方式：

* 从本地创建

```
rdd = sc.parallelize(seq, numSlices)
```

分区数由自己指定

* 从文件创建

```
rdd = sc.textFile("hdfs:///dataset/..")
```

路径也可以是file:// ，如果路径是hdfs，则rdd的分区数等于hdfs文件的block数量，即一个block就是一个分区。

* 从其它RDD衍生


在RDD上执行算子操作生成新的RDD

In [4]:
#pip install findspark
#pip install py4j
#Py4j可以使运行于python解释器的python程序动态的访问java虚拟机中的java对象

import findspark

findspark.init()

In [7]:
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
rdd=sc.parallelize([1,2,3,4,5])
rdd1=rdd.map(lambda r:r+10)
print(rdd1.collect())
sc.stop()

[11, 12, 13, 14, 15]


## Ref

https://fuhailin.github.io/Spark-Tutorial/