# Spark基础教程

Spark是一个快速和通用的大数据引擎，可以通俗的理解成一个分布式的大数据处理框架，允许用户将Spark部署在大量廉价的硬件之上，形成集群。Spark使用scala 实现，提供了 JAVA, Python，R等语言的调用接口。在本次学习我们将学习如何使用Spark清洗数据，并进行基础的特征工程操作，帮助大家掌握基础PySpark技能。

学习资料：

https://spark.apache.org/docs/latest/quick-start.html

https://spark.apache.org/docs/latest/sql-programming-guide.html

https://github.com/apache/spark/tree/4f25b3f712/examples/src/main/python

https://sparkbyexamples.com/pyspark-tutorial/

https://www.tutorialspoint.com/pyspark/index.htm

## 任务1：PySpark数据处理

- 步骤1：使用Python链接Spark环境
- 步骤2：创建dateframe数据
- 步骤3：用spark执行以下逻辑：找到数据行数、列数
- 步骤4：用spark筛选class为1的样本
- 步骤5：用spark筛选language >90 或 math> 90的样本

### 步骤1：使用Python链接Spark环境

In [1]:
# 添加此代码，进行spark初始化
import findspark
findspark.init()

### 步骤2：创建dateframe数据

In [2]:
# 导入相关库
import pandas as pd
from pyspark.sql import SparkSession

In [3]:
# 连接Spark
spark = SparkSession \
    .builder \
    .appName('pyspark') \
    .getOrCreate()


In [4]:
# 创建原始数据 
test = spark.createDataFrame([('001','1',100,87,67,83,98), ('002','2',87,81,90,83,83), ('003','3',86,91,83,89,63),
                                ('004','2',65,87,94,73,88), ('005','1',76,62,89,81,98), ('006','3',84,82,85,73,99),
                                ('007','3',56,76,63,72,87), ('008','1',55,62,46,78,71), ('009','2',63,72,87,98,64)],
                            ['number','class','language','math','english','physic','chemical'])
# 展示结果
test.show()

+------+-----+--------+----+-------+------+--------+
|number|class|language|math|english|physic|chemical|
+------+-----+--------+----+-------+------+--------+
|   001|    1|     100|  87|     67|    83|      98|
|   002|    2|      87|  81|     90|    83|      83|
|   003|    3|      86|  91|     83|    89|      63|
|   004|    2|      65|  87|     94|    73|      88|
|   005|    1|      76|  62|     89|    81|      98|
|   006|    3|      84|  82|     85|    73|      99|
|   007|    3|      56|  76|     63|    72|      87|
|   008|    1|      55|  62|     46|    78|      71|
|   009|    2|      63|  72|     87|    98|      64|
+------+-----+--------+----+-------+------+--------+



### 步骤3：用spark执行以下逻辑：找到数据行数、列数

In [5]:
# 统计行数
print(f'行数:{test.count()}')

# 统计列数
print(f'列数:{len(test.columns)}')

行数:9
列数:7


### 步骤4：用spark筛选class为1的样本

In [6]:
# 使用fliter过滤样本
test.filter(test['class'] == '1').show()

+------+-----+--------+----+-------+------+--------+
|number|class|language|math|english|physic|chemical|
+------+-----+--------+----+-------+------+--------+
|   001|    1|     100|  87|     67|    83|      98|
|   005|    1|      76|  62|     89|    81|      98|
|   008|    1|      55|  62|     46|    78|      71|
+------+-----+--------+----+-------+------+--------+



### 步骤5：用spark筛选language >90 或 math> 90的样本

In [7]:
# 过滤条件
f1 = test['language'] > 90
f2 = test['math'] > 90

# 使用fliter过滤样本
test.filter(f1|f2).show()

+------+-----+--------+----+-------+------+--------+
|number|class|language|math|english|physic|chemical|
+------+-----+--------+----+-------+------+--------+
|   001|    1|     100|  87|     67|    83|      98|
|   003|    3|      86|  91|     83|    89|      63|
+------+-----+--------+----+-------+------+--------+



## 任务2：PySpark数据统计

- 步骤1：读取文件https://cdn.coggle.club/Pokemon.csv
- 步骤2：将读取的进行保存，表头也需要保存
- 步骤3：分析每列的类型，取值个数
- 步骤4：分析每列是否包含缺失值

### 步骤1：读取文件

文件链接：https://cdn.coggle.club/Pokemon.csv

In [8]:
from pyspark import SparkFiles

# 从url读取数据
spark.sparkContext.addFile('https://cdn.coggle.club/Pokemon.csv')
df = spark.read.csv("file:///"+SparkFiles.get("Pokemon.csv"), header=True, inferSchema= True)
df = df.withColumnRenamed('Sp. Atk', 'Sp Atk')
df = df.withColumnRenamed('Sp. Def', 'Sp Def')

In [9]:
df.show()

+--------------------+------+------+-----+---+------+-------+------+------+-----+----------+---------+
|                Name|Type 1|Type 2|Total| HP|Attack|Defense|Sp Atk|Sp Def|Speed|Generation|Legendary|
+--------------------+------+------+-----+---+------+-------+------+------+-----+----------+---------+
|           Bulbasaur| Grass|Poison|  318| 45|    49|     49|    65|    65|   45|         1|    false|
|             Ivysaur| Grass|Poison|  405| 60|    62|     63|    80|    80|   60|         1|    false|
|            Venusaur| Grass|Poison|  525| 80|    82|     83|   100|   100|   80|         1|    false|
|VenusaurMega Venu...| Grass|Poison|  625| 80|   100|    123|   122|   120|   80|         1|    false|
|          Charmander|  Fire|  null|  309| 39|    52|     43|    60|    50|   65|         1|    false|
|          Charmeleon|  Fire|  null|  405| 58|    64|     58|    80|    65|   80|         1|    false|
|           Charizard|  Fire|Flying|  534| 78|    84|     78|   109|    8

### 步骤2：将读取的进行保存，表头也需要保存

In [10]:
# 将数据进行保存
df.write.csv("./data/Pokemon.csv", header=True, mode="overwrite")      

### 步骤3：分析每列的类型，取值个数

In [11]:
# 每列类型
df.printSchema()
# df.dtypes

root
 |-- Name: string (nullable = true)
 |-- Type 1: string (nullable = true)
 |-- Type 2: string (nullable = true)
 |-- Total: integer (nullable = true)
 |-- HP: integer (nullable = true)
 |-- Attack: integer (nullable = true)
 |-- Defense: integer (nullable = true)
 |-- Sp Atk: integer (nullable = true)
 |-- Sp Def: integer (nullable = true)
 |-- Speed: integer (nullable = true)
 |-- Generation: integer (nullable = true)
 |-- Legendary: boolean (nullable = true)



In [12]:
# 数据描述性统计
df.describe().show()

+-------+----------------+------+------+------------------+------------------+-----------------+------------------+----------------+-----------------+------------------+------------------+
|summary|            Name|Type 1|Type 2|             Total|                HP|           Attack|           Defense|          Sp Atk|           Sp Def|             Speed|        Generation|
+-------+----------------+------+------+------------------+------------------+-----------------+------------------+----------------+-----------------+------------------+------------------+
|  count|             800|   800|   414|               800|               800|              800|               800|             800|              800|               800|               800|
|   mean|            null|  null|  null|          435.1025|          69.25875|         79.00125|           73.8425|           72.82|          71.9025|           68.2775|           3.32375|
| stddev|            null|  null|  null|119.96303975551

### 步骤4：分析每列是否包含缺失值

In [13]:
import pyspark.sql.functions as fn

# 统计各列缺失值数量
df.agg(*[(fn.count('*') - fn.count(c)).alias(c+'_null') for c in df.columns]).show()

+---------+-----------+-----------+----------+-------+-----------+------------+-----------+-----------+----------+---------------+--------------+
|Name_null|Type 1_null|Type 2_null|Total_null|HP_null|Attack_null|Defense_null|Sp Atk_null|Sp Def_null|Speed_null|Generation_null|Legendary_null|
+---------+-----------+-----------+----------+-------+-----------+------------+-----------+-----------+----------+---------------+--------------+
|        0|          0|        386|         0|      0|          0|           0|          0|          0|         0|              0|             0|
+---------+-----------+-----------+----------+-------+-----------+------------+-----------+-----------+----------+---------------+--------------+



## 任务3：PySpark分组聚合

- 步骤1：读取文件https://cdn.coggle.club/Pokemon.csv
- 步骤2：学习groupby分组聚合的使用
- 步骤3：学习agg分组聚合的使用
- 步骤4：学习transform的使用
- 步骤5：使用groupby、agg、transform，统计数据在Type 1分组下 HP的均值

### 步骤1：读取文件

In [14]:
# from pyspark import SparkFiles

# # 从url读取数据
# spark.sparkContext.addFile('https://cdn.coggle.club/Pokemon.csv')
# df = spark.read.csv("file:///"+SparkFiles.get("Pokemon.csv"), header=True, inferSchema= True)
# df = df.withColumnRenamed('Sp. Atk', 'Sp Atk')
# df = df.withColumnRenamed('Sp. Def', 'Sp Def')

df.show()

+--------------------+------+------+-----+---+------+-------+------+------+-----+----------+---------+
|                Name|Type 1|Type 2|Total| HP|Attack|Defense|Sp Atk|Sp Def|Speed|Generation|Legendary|
+--------------------+------+------+-----+---+------+-------+------+------+-----+----------+---------+
|           Bulbasaur| Grass|Poison|  318| 45|    49|     49|    65|    65|   45|         1|    false|
|             Ivysaur| Grass|Poison|  405| 60|    62|     63|    80|    80|   60|         1|    false|
|            Venusaur| Grass|Poison|  525| 80|    82|     83|   100|   100|   80|         1|    false|
|VenusaurMega Venu...| Grass|Poison|  625| 80|   100|    123|   122|   120|   80|         1|    false|
|          Charmander|  Fire|  null|  309| 39|    52|     43|    60|    50|   65|         1|    false|
|          Charmeleon|  Fire|  null|  405| 58|    64|     58|    80|    65|   80|         1|    false|
|           Charizard|  Fire|Flying|  534| 78|    84|     78|   109|    8

### 步骤2：学习groupby分组聚合的使用

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html#pyspark.sql.DataFrame.groupBy

### 步骤3：学习agg分组聚合的使用

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.GroupedData.agg.html#pyspark.sql.GroupedData.agg

### 步骤4：学习transform的使用

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.transform.html

### 步骤5：使用groupby、agg、transform，统计数据在Type 1分组下 HP的均值

In [15]:
# df.groupBy('Type 1').avg('HP').collect()  # df.groupBy('Type 1').mean('HP').collect()
df.groupBy('Type 1').agg({'HP':'mean'}).collect()

[Row(Type 1='Water', avg(HP)=72.0625),
 Row(Type 1='Poison', avg(HP)=67.25),
 Row(Type 1='Steel', avg(HP)=65.22222222222223),
 Row(Type 1='Rock', avg(HP)=65.36363636363636),
 Row(Type 1='Ice', avg(HP)=72.0),
 Row(Type 1='Ghost', avg(HP)=64.4375),
 Row(Type 1='Fairy', avg(HP)=74.11764705882354),
 Row(Type 1='Psychic', avg(HP)=70.63157894736842),
 Row(Type 1='Dragon', avg(HP)=83.3125),
 Row(Type 1='Flying', avg(HP)=70.75),
 Row(Type 1='Bug', avg(HP)=56.88405797101449),
 Row(Type 1='Electric', avg(HP)=59.79545454545455),
 Row(Type 1='Fire', avg(HP)=69.90384615384616),
 Row(Type 1='Ground', avg(HP)=73.78125),
 Row(Type 1='Dark', avg(HP)=66.80645161290323),
 Row(Type 1='Fighting', avg(HP)=69.85185185185185),
 Row(Type 1='Grass', avg(HP)=67.27142857142857),
 Row(Type 1='Normal', avg(HP)=77.27551020408163)]

## 任务4：SparkSQL基础语法

- 步骤1：使用Spark SQL完成任务1里面的数据筛选
- 步骤2：使用Spark SQL完成任务2里面的统计（列可以不统计）
- 步骤3：使用Spark SQL完成任务3的分组统计

### 步骤1：使用Spark SQL完成任务1里面的数据筛选

In [16]:
# 用spark筛选test数据集中 language>90 或 math>90 的样本
# # 过滤条件
# f1 = test['language'] > 90
# f2 = test['math'] > 90
# # 使用fliter过滤样本
# test.filter(f1|f2).show()

test.createOrReplaceTempView("test")
spark.sql("SELECT * FROM test WHERE language>90 or math>90").collect()

[Row(number='001', class='1', language=100, math=87, english=67, physic=83, chemical=98),
 Row(number='003', class='3', language=86, math=91, english=83, physic=89, chemical=63)]

### 步骤2：使用Spark SQL完成任务2里面的统计（列可以不统计）

### 步骤3：使用Spark SQL完成任务3的分组统计