In [2]:
from pyspark import SparkConf, SparkContext

# 初始化SparkConf对象
conf = SparkConf().setAppName('miniProject').setMaster('local[*]')
sc = SparkContext.getOrCreate(conf)

## 初始化RDD

### 本地内存中已经有一份序列数据(比如python的list)，可以通过sc.parallelize去初始化一个RDD。当执行这个操作以后，list中的元素将被自动分块(partitioned)，并且把每一块送到集群上的不同机器上。

In [7]:
# 利用list创建一个RDD，parallelize()方法可以把list，numpy array，pdseries，datafram转成rdd
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd)

# getNumPartitions()查看list被分成了几部分
print(rdd.getNumPartitions())

# glom().collect()查看分区状况
print(rdd.glom().collect())

ParallelCollectionRDD[4] at parallelize at PythonRDD.scala:195
8
[[], [1], [], [2], [3], [], [4], [5]]


### 创建RDD的另一个方法是直接把文本读到RDD。文本的每一行都会被当做一个item，不过需要注意的一点是，Spark一般默认给定的路径是指向HDFS的，如果要从本地读取文件的话，给一个file://开头（windows下是以file:\\开头）的全局路径。

In [10]:
# 记录当前pyspark工作环境
import os

cwd = os.getcwd()
print(cwd)

# 要读入文件的全路径
rdd = sc.textFile('file://'+cwd+'/wordcount.txt')

# wholeTextFiles()读取真个文件夹的所有文件
# rdd = sc.wholeTextFiles('file://'+cwd')

# first()方法读取rdd数据第一个item
rdd.first()

/Users/wangyutian/code/python/spark


'word count'

## RDD Transformation

### rdd常用transformation

map() 对RDD的每一个item都执行同一个操作  
flatMap() 对RDD中的item执行同一个操作以后得到一个list，然后以平铺的方式把这些list里所有的结果组成新的list  
filter() 筛选出来满足条件的item  
distinct() 对RDD中的item去重  
sample() 从RDD中的item中采样一部分出来，有放回或者无放回  
sortBy() 对RDD中的item进行排序  

In [14]:
numbersRDD = sc.parallelize(range(1, 10+1))
print(numbersRDD)

# map()对rdd的每一个item都执行同一个操作
squaresRDD = numbersRDD.map(lambda x: x**2)
print(squaresRDD.collect())

# filter()筛选出来满足条件的item
filterdRDD = numbersRDD.filter(lambda x: x % 2 == 0)
print(filterdRDD.collect())

PythonRDD[17] at RDD at PythonRDD.scala:53
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
[2, 4, 6, 8, 10]


In [15]:
# flatMap() 对RDD中的item执行同一个操作以后得到一个list，然后以平铺的方式把这些list里所有的结果组成新的list
sentencesRDD = sc.parallelize(['Hello world', 'My name is yt'])
wordsRDD = sentencesRDD.flatMap(lambda sentence: sentence.split(' '))
print(wordsRDD.collect())
print(wordsRDD.count())

['Hello', 'world', 'My', 'name', 'is', 'yt']
6


这里如果使用map的结果是[[‘Hello’, ‘world’], [‘My’, ‘name’, ‘is’, ‘Patrick’]]，   
使用flatmap的结果是全部展开[‘Hello’, ‘world’, ‘My’, ‘name’, ‘is’, ‘Patrick’]

### 各个transformation可以串联使用

In [20]:
def doubleIfOdd(x):
    if x % 2 == 1:
        return 2 * x
    else:
        return x


numbersRDD = sc.parallelize(range(1, 10 + 1))
resultRDD = numbersRDD.map(doubleIfOdd).filter(lambda x: x > 6).distinct()
resultRDD.collect()

[8, 10, 18, 14]

### 当遇到更复杂的结构，比如被称作“pair RDDs”的以元组形式组织的k-v对（key, value），Spark中针对这种item结构的数据，定义了一些transform和action.

reduceByKey(): 对所有有着相同key的items执行reduce操作  
groupByKey(): 返回类似(key, listOfValues)元组的RDD，后面的value List 是同一个key下面的  
sortByKey(): 按照key排序  
countByKey(): 按照key去对item个数进行统计  
collectAsMap(): 和collect有些类似，但是返回的是k-v的字典  

In [47]:
rdd = sc.parallelize(['Hello world', 'Hello New York', 'York says hello'])
# 将word映射成(word, 1)并对所有有着相同key的items执行reduce操作
resultRDD = rdd.flatMap(lambda sentence: sentence.split(' '))\
    .map(lambda word: word.lower())\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda x, y: x + y)
print(resultRDD.collect())

# collectAsMap类似collect,以k-v字典的形式返回
result = resultRDD.collectAsMap()
print(result)

# sortByKey按键排序
print(resultRDD.sortByKey(ascending=True).take(2))

# 取出频次最高的两个词
print(resultRDD.sortBy(lambda x: x[1], ascending=False).take(2))

[('hello', 3), ('york', 2), ('world', 1), ('new', 1), ('says', 1)]
{'hello': 3, 'york': 2, 'world': 1, 'new': 1, 'says': 1}
[('hello', 3), ('new', 1)]
[('hello', 3), ('york', 2)]


## RDD间的操作

### 如果有2个RDD，可以通过下面这些操作，对它们进行集合运算得到1个新的RDD

rdd1.union(rdd2): 所有rdd1和rdd2中的item组合（并集）  
rdd1.intersection(rdd2): rdd1 和 rdd2的交集  
rdd1.substract(rdd2): 所有在rdd1中但不在rdd2中的item（差集）  
rdd1.cartesian(rdd2): rdd1 和 rdd2中所有的元素笛卡尔乘积（正交和）  

In [49]:
# 初始化两个RDD
numbersRDD = sc.parallelize([1, 2, 3])
moreNumbersRDD = sc.parallelize([2, 3, 4])

# intersection()取交集
print(numbersRDD.intersection(moreNumbersRDD).collect())

# substract()取差集
print(numbersRDD.subtract(moreNumbersRDD).collect())

# cartesian()取笛卡尔积
print(numbersRDD.cartesian(moreNumbersRDD).collect())

[2, 3]
[1]
[(1, 2), (1, 3), (1, 4), (2, 2), (2, 3), (2, 4), (3, 2), (3, 3), (3, 4)]


### 在给定2个RDD后，可以通过一个类似SQL的方式去join它们

In [52]:
# Home of different people
homesRDD = sc.parallelize([
    ('Brussels', 'John'),
    ('Brussels', 'Jack'),
    ('Leuven', 'Jane'),
    ('Antwerp', 'Jill'),
])

# Quality of life index for various cities
lifeQualityRDD = sc.parallelize([
    ('Brussels', 10),
    ('Antwerp', 7),
    ('RestOfFlanders', 5),
])

# join
print(homesRDD.join(lifeQualityRDD).collect())
# leftOuterJoin
print(homesRDD.leftOuterJoin(lifeQualityRDD).collect())
# rightOuterJoin
print(homesRDD.rightOuterJoin(lifeQualityRDD).collect())
# cogroup
homesRDD.cogroup(lifeQualityRDD).collect()

[('Brussels', ('John', 10)), ('Brussels', ('Jack', 10)), ('Antwerp', ('Jill', 7))]
[('Brussels', ('John', 10)), ('Brussels', ('Jack', 10)), ('Antwerp', ('Jill', 7)), ('Leuven', ('Jane', None))]
[('Brussels', ('John', 10)), ('Brussels', ('Jack', 10)), ('Antwerp', ('Jill', 7)), ('RestOfFlanders', (None, 5))]


[('Brussels',
  (<pyspark.resultiterable.ResultIterable at 0x10a40dbe0>,
   <pyspark.resultiterable.ResultIterable at 0x10a40d8d0>)),
 ('Antwerp',
  (<pyspark.resultiterable.ResultIterable at 0x10a40da58>,
   <pyspark.resultiterable.ResultIterable at 0x10a40d7b8>)),
 ('RestOfFlanders',
  (<pyspark.resultiterable.ResultIterable at 0x10a40db70>,
   <pyspark.resultiterable.ResultIterable at 0x10a40d898>)),
 ('Leuven',
  (<pyspark.resultiterable.ResultIterable at 0x10a40d908>,
   <pyspark.resultiterable.ResultIterable at 0x10a3dfb70>))]

In [53]:
# Oops!  Those <ResultIterable>s are Spark's way of returning a list
# that we can walk over, without materializing the list.
# Let's materialize the lists to make the above more readable:
(homesRDD
 .cogroup(lifeQualityRDD)
 .map(lambda x:(x[0], (list(x[1][0]), list(x[1][1]))))
 .collect())

[('Brussels', (['John', 'Jack'], [10])),
 ('Antwerp', (['Jill'], [7])),
 ('RestOfFlanders', ([], [5])),
 ('Leuven', (['Jane'], []))]

## 惰性计算，actions方法

**特别注意：Spark的一个核心概念是惰性计算。当你把一个RDD转换成另一个的时候，这个转换不会立即生效执行！！！Spark会把它先记在心里，等到真的有actions需要取转换结果时，才会重新组织transformations(因为可能有一连串的变换)。这样可以避免不必要的中间结果存储和通信。**

### 常见的action如下，当它们出现的时候，表明需要执行上面定义过的transform了

collect(): 计算所有的items并返回所有的结果到driver端，接着 collect()会以Python list的形式返回结果  
first(): 和上面是类似的，不过只返回第1个item  
take(n): 类似，但是返回n个item  
count(): 计算RDD中item的个数  
top(n): 返回头n个items，按照自然结果排序  
reduce(): 对RDD中的items做聚合  

In [58]:
rdd = sc.parallelize(range(1, 10+1))
rdd.reduce(lambda x, y: x + y)

55

### 有时候需要重复用到某个transform序列得到的RDD结果。但是一遍遍重复计算显然是要开销的，所以我们可以通过一个叫做cache()的操作把它暂时地存储在内存中。缓存RDD结果对于重复迭代的操作非常有用，比如很多机器学习的算法，训练过程需要重复迭代。

In [61]:
import numpy as np

numbersRDD = sc.parallelize(np.linspace(1.0, 10.0, 10))
squaresRDD = numbersRDD.map(lambda x: x**2)

squaresRDD.cache()

avg = squaresRDD.reduce(lambda x, y: x + y)/squaresRDD.count()
print(avg)

38.5
