<p style="font-family: Arial; font-size:2.5em;color:purple; font-style:bold"><br/>
I9000平台Spark使用基础
</p>

----------------------------

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
I9000云平台鸟瞰（复习）
</p>

- 应用发布与管理平台
- 分布式存储与计算

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
大数据存储与计算最常用的两个工具
</p>

- HDFS
  - Hadoop分布式文件系统
- Spark
  - 分布式计算引擎

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
HDFS – Hadoop分布式文件系统
</p>

- 文件系统
  - 树形结构组织的目录和文件
  - Linux命令行使用ls、cd等命令切换和查看
  - Windows使用我的电脑查看
  - 在编程语言例如Python中，通过调用函数访问
- HDFS分布式文件系统，使用者角度
  - 与普通文件系统类似
  - 主要的区别是能够存放大文件，比如一个文件几百GB或几个T
- HDFS作为数据存储层，属于I9000系统的组成部分

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
HDFS为什么能够存储大文件（架构概览）
</p>

<img src="i9000-spark-image/hdfs.png"/>

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
在I9000中使用HDFS
</p>

- HDFS是I9000的数据层
- 通过I9000的Web界面上传数据
  - 存放在HDFS中
- 使用这些文件
  - 通过文件在HDFS上的URI来访问文件

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
练习 – 上传文件到HDFS
</p>

- 本地文件路径（XXX替换为云桌面用户名）

> /home/XXX/i9000-training/spark/spark-readme.md

- 请保存上传文件的URI，下面练习会用到

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
Spark – 分布式计算引擎
</p>

- 统一的平台
  - 数据加载、SQL分析、机器学习、流式计算
  - 一致性的编程接口
  - 同时支持交互式分析和应用程序开发
- Spark只是计算引擎，可以采用多种数据引擎
  - HDFS
  - Sql数据库
  - NoSql数据库
  - 消息队列，例如Apache Kafka

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
Spark应用程序组成
</p>

<img src="i9000-spark-image/spark-overview.png"/>

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
Spark多语言支持
</p>

- Spark核心是Scala语言编写
  - 运行在JVM上
- Spark支持的用户语言
  - Scala
  - Java
  - Python
  - R

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
使用Python时的进程关系
</p>

<img src="i9000-spark-image/python-process.png"/>

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
Spark应用编程接口（API）
</p>

- 低层无结构的RDD
- 高层有结构的DataFrame
- 高层DataFrame建筑在低层RDD之上

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
Spark编程入口
</p>

- 2.x版本
  - SparkSession
- 1.x版本
  - SparkContext
- 今天的例子使用1.x版本

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
I9000中Spark交互式环境
</p>

- Python Spark Shell
- Jupyter Notebook
  - 讲稿正在用的环境

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
Python Spark Shell
</p>

- 使用Python语言的Spark交互环境
- 在I9000中启动
  - Jupyter terminal
- 自动创建SparkContext对象
  - pyspark中，绑定到sc变量上
- SparkContext位于驱动进程（driver）中
  - 是用户代码与Spark集群沟通的桥梁
- 每个Spark应用程序对应一个SparkContext对象

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
低层编程接口 - RDD
</p>

- Resilient Distributed Dataset
  - 弹性分布式数据集
- 想象为一个大的列表，类似python的list
  - 不同的是，元素存放在集群的多个节点中
- RDD是不可变的
  - 修改RDD，不会改变旧的，而是创建新的RDD

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
RDD的创建
</p>

- 从现有序列创建
  - SparkContext对象的parallelize()方法
- 从外部存储系统创建，例如HDFS
  - 例如文本文件：SparkContext对象的textFile()方法

In [None]:
# 导入python spark库
import pyspark

# 创建SparkContext对象
sc = pyspark.SparkContext()

# 将本地列表分发到集群，创建RDD
rdd_1 = sc.parallelize([1, 2, 3, 4, 5])

# 查看RDD的前三个元素
print(rdd_1.take(3))

# 收集集群中RDD的所有元素到本地
print(rdd_1.collect())

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
RDD的两类操作
</p>

- 转换操作（transformation）
  - 输入一个RDD，输出一个新的RDD
- action操作
  - 输入一个RDD，输出一个值

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
RDD的转换操作示例 - map
</p>

In [None]:
new_rdd = rdd_1.map(lambda x: x * 2)
new_rdd.collect()

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
RDD的action操作示例 - reduce
</p>

In [None]:
rdd_1.reduce(lambda result, x: result + x)

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
练习中用到的两个操作
</p>

- flatMap
- reduceByKey

In [None]:
new_rdd = rdd_1.flatMap(lambda x: [x, x])
new_rdd.collect()

In [None]:
rdd_2 = sc.parallelize([('a', 1), ('a', 2), ('b', 5), ('b', 5), ('b', 2)])
new_rdd = rdd_2.reduceByKey(lambda result, x: result + x)
new_rdd.collect()

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
练习 - Spark版单词计数
</p>

1. 打开I9000的jupyter terminal
2. 运行pyspark
3. 输入下面代码运行，一次一行

** hdfs://XXX 替换为刚才上传文件在HDFS上的URI **

In [None]:
lines = sc.textFile('hdfs://XXX')
f = lambda line: [(str(word).lower(), 1) for word in line.split()]
words = lines.flatMap(f)
word_count_rdd = words.reduceByKey(lambda total, count: total + count)
print(word_count_rdd.take(3))
word_to_count = dict(word_count_rdd.collect())
print(word_to_count['spark'])

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
美国加州房屋数据
</p>

- block group（区块组）
  - 一行是一个block group
  - 一个block group的人口数量平均在一两千
- 数据共有9列
  - longitude：经度
  - latitude：纬度
  - housingMedianAge：区块组的居民的年龄中位数
  - totalRooms：区块组的房屋内的房间总数
  - totalBedrooms：区块组的房屋内的卧室总数
  - population：区块组的居民的数量
  - households：区块组的房屋数量
  - medianIncome：区块组的居民收入中位数
  - medianHouseValue：区块组的房屋价值中位数

> ** medianHouseValue是输出（因变量） ** <br/>
> ** 其余变量是输入（自变量） **

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
练习 - 加载并查看房屋数据
</p>

1. 上传房屋数据到HDFS
  - /home/XXX/i9000-training/spark/house/cal_housing.data
  - /home/xxx/i9000-training/spark/house/cal_housing.domain
2. 在spark shell中运行下面的程序语句，一次一条

In [None]:
# 从HDFS文件中创建RDD
lines = sc.textFile('XXX')
header = sc.textFile('YYY')

# 查看列信息
print(header. collect())


In [None]:
# 查看前三条数据
print(lines.take(3))

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
将RDD转换为DataFrame
</p>

DataFrame是结构化的类型，类似于数据库的表或excel表格

In [None]:
# 导入相关模块
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# 将原始文本行转为浮点数列表
f = lambda line: [float(sp) for sp in line.split(', ')]
rows = lines.map(f)

# 定义schema（DataFrame的结构描述）
names = 'longitude,latitude,housingMedianAge,totalRooms,totalBedrooms,population,households,medianIncome,medianHouseValue'
fields = [StructField(name, FloatType()) for name in names.split(',')]
schema = StructType(fields)

# 创建DataFrame
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(rows, schema)

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
DataFrame操作示例
</p>

In [None]:
# 显示数据摘要
df.describe().show()

In [None]:
# 显示DataFrame的前5条
df.show(5)

In [None]:
# 显示DataFrame的schema
df.printSchema()

In [None]:
# 显示人口数量和卧室数量两列内容
df.select('population', 'totalBedrooms').show(5)

In [None]:
# 选择年龄中位数超过40的区块组
df.filter(df['housingMedianAge'] > 40).show(5)

In [None]:
# 按年龄中位数分组统计区块组的数量
df.groupBy("housingMedianAge").count().show(5)

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
DataFrame数据预处理示例 - 调整目标列的单位
</p>

In [None]:
# 查看调整前的目标列
df.select('medianHouseValue').show(5)

In [None]:
# 调整为以10万为单位
from pyspark.sql.functions import *
df = df.withColumn('medianHouseValue', col('medianHouseValue')/100000)

In [None]:
# 查看调整后的目标列
df.select('medianHouseValue').show(5)

<p style="font-family: Arial; font-size:1.5em;color:purple; font-style:bold">
练习 - DataFrame练习
</p>

- 程序语句来源
  - /home/XXX/i9000-training/spark/house/house.py
- 在Spark Shell中运行下面两部分程序语句
  - 房屋数据练习 - 创建DataFrame
  - 房屋数据练习 - DataFrame相关操作