# 单值 RDD 操作

## map

`map` 的用途是将 `RDD` 中的每个元素进行操作(加工), 操作后每个元素得到新的值, 使用这些新的元素生成一个新的 `RDD`

In [1]:
data = [1, 2, 3, 4, 5, 6, 7] # 1
rdd = sc.parallelize(data, 3) # 2
arr = rdd.map(lambda x: x + 1).collect() #3
print(arr) # 4

[2, 3, 4, 5, 6, 7, 8]


1. **创建 python 数组**: 命名为 data
2. **创建 RDD 对象**: 使用 SparkContent 的 parallelize 方法将 data 数组分为 3 个 partitions 并返回 RDD 对象赋值给 rdd 变量. *注意: 这里用 sc 表示 SparkContent, 这是因为启动 pyspark 时, 系统自动导入了 SparkContent 类. 其实等于默认执行了 `from pyspark import SparkContent as sc`*
3. **创建 python 数组**: 命名为 arr. 指定了一个 lambda 函数(也称之为**匿名函数**), 该函数的作用是对 rdd 中的每个元素进行加 1 操作. 最后的 collect 方法是 RDD 类的方法, 用于将一个 RDD 对象转换为 python 的 数组.
4. **控制台输出/打印** python 数组 arr

## 综合练习

已知文件在hadoop文件系统的 `data/walmart.txt` 中包含 4 列信息, 是沃尔玛全国分店每天的流水, 分别表示分店ID(id), 交易量(x), 交易额(y)和利润(z), 类型均为整数, 数据举例如下

|ID|交易量(x)|交易额(y)|利润(z)|
|:--|:--|:--|:--|
|1|23|5600|5|
|2|30|5800|7|
|1|27|5000|10|
|3|24|6900|5|
|2|45|5800|7|
|2|28|5800|7|

> 请问: 如何得到以下 SQL 等价的结算结果(假设存在表 T 中):

```sql
select id, sum(x), max(y), average(x) from T group by id;
```

> 扩展: 如何得到以下 SQL 等价的计算结果(提示, 用 reduce 算子)

```sql
select sum(x), max(y), min(z), average(x) from T;
```

In [2]:
# from file to create RDD, RDD's partitions number is 2
walmartRDD = sc.textFile('hdfs://hadoop:9000/data/walmart.txt', 2)

# split every line to list
walmartRDD.map(lambda line: line.split(',')).collect()

[['1', '23', '5600', '5'],
 ['2', '30', '5800', '7'],
 ['1', '27', '5000', '10'],
 ['3', '24', '6900', '5'],
 ['2', '45', '5800', '7'],
 ['2', '28', '5800', '7']]

In [3]:
# create tuple(id, x) x=交易量
xRDD = walmartRDD.map(lambda line: line.split(',')) \
    .map(lambda row: (row[0], int(row[1])))
xRDD.collect()

[('1', 23), ('2', 30), ('1', 27), ('3', 24), ('2', 45), ('2', 28)]

In [4]:
# create tuple(id, y) y=交易额
yRDD = walmartRDD.map(lambda line: line.split(',')) \
    .map(lambda row: (row[0], int(row[2])))
yRDD.collect()

[('1', 5600), ('2', 5800), ('1', 5000), ('3', 6900), ('2', 5800), ('2', 5800)]

In [5]:
# create tuple(id, z) z=利润
zRDD = walmartRDD.map(lambda line: line.split(',')) \
    .map(lambda row: (row[0], int(row[3])))
zRDD.collect()

[('1', 5), ('2', 7), ('1', 10), ('3', 5), ('2', 7), ('2', 7)]

### 问题答案

In [6]:
# sum(x) group by id
xRDD.reduceByKey(lambda x, y: x + y) \
    .collect()

[('1', 50), ('2', 103), ('3', 24)]

In [7]:
# max(y) group by id
# way 1
yRDD.groupByKey() \
    .mapValues(max) \
    .collect()

[('1', 5600), ('2', 5800), ('3', 6900)]

In [8]:
# max(y) group by id
# way 2
seqOp = (lambda x, y: y if x < y else x)
combOp = (lambda x, y: y if x < y else x)
yRDD.aggregateByKey(0, seqOp, combOp) \
    .collect()

[('1', 5600), ('2', 5800), ('3', 6900)]

> **上面有两种方法求最大值, 第二种方法在性能上要远远优于第一种**

In [9]:
# min(z) group by id
# way 1
zRDD.groupByKey() \
    .mapValues(min) \
    .collect()

[('1', 5), ('2', 7), ('3', 5)]

In [10]:
# min(z) group by id
# way 2
import sys
seqOp = (lambda x, y: y if x > y else x)
combOp = (lambda x, y: y if x > y else x)
zRDD.aggregateByKey(sys.maxsize, seqOp, combOp) \
    .collect()

[('1', 5), ('2', 7), ('3', 5)]

> **上面有两种方法求最大值, 第二种方法在性能上要远远优于第一种**

In [11]:
# average(x) group by id
seqOp = (lambda acc, val: (acc[0] + val, acc[1] + 1))
combOp = (lambda p1, p2: (p1[0] + p2[0], p1[1] + p2[1]))

(xRDD.aggregateByKey((0, 0), seqOp, combOp)       # retrun RDD's element's type is (id, (sum(x), count))
    .map(lambda el: (el[0], el[1][0] / el[1][1])) # retrun RDD's element's type is (id, average)
    .collect()
)

[('1', 25.0), ('2', 34.333333333333336), ('3', 24.0)]

### 扩展问题的答案

In [12]:
# sum(x)
xRDD.map(lambda x: x[1]) \
    .reduce(lambda x, y: x + y)

177

In [13]:
# max(y)
yRDD.map(lambda x: x[1]) \
    .reduce(lambda x, y: x if x > y else y)

6900

In [14]:
# min(z)
zRDD.map(lambda x: x[1]) \
    .reduce(lambda x, y: x if x < y else y)

5

In [16]:
# average(x)
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
iSum, iCount = xRDD.map(lambda x: x[1]) \
    .aggregate((0, 0), seqOp, combOp)
iSum / iCount

29.5

## About aggregate

In [18]:
arr = [1, 2, 3, 4]
rdd = sc.parallelize(arr)
seqOp = (lambda x, y: x + y)
combOp = (lambda x, y: x + y)
rdd.aggregate(0, seqOp, combOp)

10

当初始值不是 0 的时候, aggregate 函数的行为变得比较奇特

In [19]:
rdd.aggregate(1, seqOp, combOp)

12

正常理解应该是 11, 即应该是 $1+1+2+3+4=11$ (其中第一个 1 是初始值), 但结果为何是 12 呢?

主要原因是在 combOp 这个函数, 他最后做了 partions 的合并动作, 在进行 combin 时, 参数 x 的值 1, 即为初始化的值, 参数 y 的值是 seqOp 函数结算的结果, 这里是 11, 因此 $x+y=1+11=12$

根据上面的规则, 当初始值为 5 的时候, $x+y=5+5+1+2+3+4=20$

In [20]:
rdd.aggregate(5, seqOp, combOp)

20

再进一步说明:
初始值=5 rdd 中的元素为 [1,2,3,4]
- seqOp的计算步骤为: 初始值 + rdd = 5 + arr[1,2,3,4] = 15
- combOp的计算步骤为: 初始值 5 + seqOp 的结果 15 = 20

下面我们再试一试有 partition 的情况下

In [21]:
rdd = sc.parallelize(arr, 2)
rdd.aggregate(2, seqOp, combOp)

16

- 分区为 2 时, rdd 的值被拆分为 2 部分 [1,2] 和 [3,4]
- seqOp 的结果是: partion 1 is $2+1+2=5$; partion 2 is $2+3+4=9$
- combOp 的结果是: partion 1 + partion 2 + 初始值 = $5+9+2=16$

整理成公式: 假设分区是 $n$, 初始值是 $m$, $z$ 是 rdd 内元素的合计结果, 则最后得到的结果 $r$ 为

$r=n \times m+m+z$