# 附录2：运行 Spark

In [1]:
# !pip install pyspark

## 1. 检查 Spark 环境

运行一段简单的代码，检查 Spark 环境是否可用。

In [2]:
from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("App") \
    .getOrCreate()

vertices = spark.createDataFrame([
    ("1", "A"),
    ("2", "B"),
    ("3", "C"),
    ("4", "D"),
    ("5", "E")
], ["id", "name"])

vertices.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/30 19:56:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+---+----+
| id|name|
+---+----+
|  1|   A|
|  2|   B|
|  3|   C|
|  4|   D|
|  5|   E|
+---+----+



## 2. 安装 & 使用 graphframes

[graphframes](https://github.com/graphframes/graphframes) 是 Spark 的一个图计算库。

首先安装 graphframes:

```bash
pip install graphframes
```

由于 graphframes 需要 jar 文件配合，无法直接在 Jupyter 中运行。请移步 [./graph/task.sh](./graph/task.sh)。

在 `task.sh` 中，我们用 `spark-submit` 执行一个简单的图连通算法。运行以下代码：

```bash
cd graph
sh task.sh
```

预期的计算结果如下：

```
+---+-------+---+-------------+
| id|   name|age|    component|
+---+-------+---+-------------+
|  g|  Gabby| 60| 146028888064|
|  f|  Fanny| 36| 412316860416|
|  e| Esther| 32| 670014898176|
|  d|  David| 29| 670014898176|
|  c|Charlie| 30|1047972020224|
|  b|    Bob| 36|1047972020224|
|  a|  Alice| 34| 670014898176|
+---+-------+---+-------------+
```

## 3. 定制 Python 环境

1）将名为 `myenv` 的本地环境，打包成 `myenv.tar.gz` 压缩文件。

```bash
conda activate myenv
conda install conda-pack
conda pack -o myenv.tar.gz
```

2）运行以下代码，将 `myenv.tar.gz` 文件上传到 Spark 节点。

```bash
# arguments
APP_NAME="SPARK_TASK"
P1D=$(date -d "1 day ago" +%Y-%m-%d)
QUEUE_NAME="bigdata"
files="FILE_A,FILE_B"
py_path="PATH/TO/YOUR_PYTHON_FILE.py"
table_name="YOUR_DB.YOUR_TABLE"

# task
spark-submit \
--master yarn \
--deploy-mode cluster \
--name "${APP_NAME}_${P1D}" \
--queue "${QUEUE_NAME}" \
--priority HIGH \
--conf spark.executor.memory=4g \
--conf spark.dynamicAllocation.minExecutors=800 \
--conf spark.pyspark.driver.python=./PythonEnv/bin/python \
--conf spark.pyspark.python=./PythonEnv/bin/python \
--files "${files}" \
--archives hdfs/path/to/myenv.tar.gz#PythonEnv \
  "${py_path}" \
    --date "${P1D}" \
    --outputTable "${table_name}"

# info
if [ $? -ne 0 ]; then
    echo -e "[INFO] ${APP_NAME} failed."
else
    echo -e "[INFO] ${APP_NAME} done."
fi
```

> **Note:** 使用 `--archives [HDFS_PATH]#[FOLDER_NAME]` 参数，Spark 会将 `[HDFS_PATH]` 路径上的压缩包文件解压到名为 `[FOLDER_NAME]` 的文件夹中。
