In [2]:
# 初始化SparkContext(在Shell中会自动初始化一个sc = SparkContext())
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)

In [None]:
# 关闭Spark
# sc.stop()
# 或 System.exit(0)
# 或 sys.exit()

In [2]:
# 使用 textFile() 创建一个字符串的 RDD
lines =  sc.textFile("../README.md")

In [3]:
# 调用转化操作 filter()
pythonLines = lines.filter(lambda line: "Python" in line)

In [7]:
# 调用 first() 行动操作
pythonLines.first()

'* The Python examples require urllib3'

In [50]:
# persist() 把RDD持久化到内存中,避免重复运算
pythonLines.persist()
pythonLines.count()
# unpersist() 把RDD从缓存中移除
pythonLines.unpersist()

PythonRDD[3] at RDD at PythonRDD.scala:53

In [8]:
# parallelize() 直接读取文本，做小型测试时使用
lines = sc.parallelize(['pandas', 'i like pandas'])

In [10]:
lines.filter(lambda line: "pandas" in line)

PythonRDD[8] at RDD at PythonRDD.scala:53

### 读取log

In [25]:
inputRDD = sc.textFile("../data/logs/log1.log")
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lambda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)

In [27]:
badLinesRDD.count()

1

In [28]:
# 取10条
for line in badLinesRDD.take(10):
    print(line)

71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] "GET /error HTTP/1.1" 404 505 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"


In [29]:
# 取所有(需要确保本地内存足够大)
for line in badLinesRDD.collect():
    print(line)

71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] "GET /error HTTP/1.1" 404 505 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"


### 向Spark传递函数

In [None]:
word = rdd.filter(lambda x: 'error' in x)

In [None]:
# 或者:
def contains_error(s):
    return 'error' in s

word = rdd.filter(contains_error)

In [31]:
# map
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x*x).collect()
squared

[1, 4, 9, 16]

In [36]:
# flatMap
lines = sc.parallelize(['hello world', 'hi'])
words = lines.flatMap(lambda lines: lines.split(' '))
words.collect()

['hello', 'world', 'hi']

In [39]:
words = lines.map(lambda lines: lines.split(' '))
words.collect()

[['hello', 'world'], ['hi']]

### 行动操作

In [40]:
# reduce
nums = sc.parallelize([1, 2, 3, 4])
sum = nums.reduce(lambda x, y: x + y)
sum

10

In [48]:
nums.takeSample(False, 2)

[2, 3]

### pair RDD

#### 新建pair RDD

In [53]:
# 使用第一个单词作为键创建出一个 pair RDD
lines = sc.parallelize(['hello world', 'hi'])
pairs = lines.map(lambda x: (x.split(" ")[0], x))
pairs.collect()

[('hello', 'hello world'), ('hi', 'hi')]

In [54]:
pairRDD = sc.parallelize({(1,2), (3,4), (3,6)})

#### 操作pair RDD

In [57]:
# filter
result = pairs.filter(lambda key_value: len(key_value[1]) < 3)
result.collect()

[('hi', 'hi')]

In [58]:
# mapValues
result = pairRDD.mapValues(lambda x: x+1)
result.collect()

[(1, 3), (3, 5), (3, 7)]

In [68]:
rdd = sc.parallelize({('panda',0),('pink',3),('pirate',3),('panda',1),('pink',4)})
result = rdd.mapValues(lambda x: (x,1))
result.collect()

[('pink', (3, 1)),
 ('pink', (4, 1)),
 ('pirate', (3, 1)),
 ('panda', (1, 1)),
 ('panda', (0, 1))]

In [69]:
rdd = sc.parallelize({('panda',0),('pink',3),('pirate',3),('panda',1),('pink',4)})
result = rdd.mapValues(lambda x: (x,1)).reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
result.collect()

[('pink', (7, 2)), ('pirate', (3, 1)), ('panda', (1, 2))]

#### 单词计数

##### 方法 1

In [74]:
rdd = sc.textFile('../data/words_count.txt')
words = rdd.flatMap(lambda x: x.split(' '))
result = words.map(lambda x: (x,1))
result.collect()

[('pandas', 1),
 ('flink', 1),
 ('numpy', 1),
 ('pandas', 1),
 ('python', 1),
 ('flink', 1),
 ('spark', 1)]

In [73]:
rdd = sc.textFile('../data/words_count.txt')
words = rdd.flatMap(lambda x: x.split(' '))
result = words.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)
result.collect()

[('pandas', 2), ('flink', 2), ('numpy', 1), ('python', 1), ('spark', 1)]

##### 方法 2

In [77]:
rdd = sc.textFile('../data/words_count.txt')
result = rdd.flatMap(lambda x: x.split(' ')).countByValue()
result

defaultdict(int,
            {'pandas': 2, 'flink': 2, 'numpy': 1, 'python': 1, 'spark': 1})

#### 求平均值

In [90]:
rdd = sc.parallelize({('panda',0),('pink',3),('pirate',3),('panda',1),('pink',4)})
sum_count = rdd.combineByKey((lambda x: (x,1)),
                             (lambda x,y: (x[0]+y, x[1]+1)),
                             (lambda x,y: (x[0]+y[0], x[1]+y[1])))
sum_count.collect()

[('pink', (7, 2)), ('pirate', (3, 1)), ('panda', (1, 2))]

In [100]:
result = sum_count.mapValues(lambda xy: xy[0]/xy[1])
result.collect()

[('pink', 3.5), ('pirate', 3.0), ('panda', 0.5)]

In [101]:
# collectAsMap 返回的是dict
result.collectAsMap()

{'pink': 3.5, 'pirate': 3.0, 'panda': 0.5}

#### 设置操作的并行度

In [106]:
data = [("a", 3), ("b", 4), ("a", 1)]
rdd1 = sc.parallelize(data).reduceByKey(lambda x, y: x+y) # 默认并行度
rdd1

PythonRDD[201] at RDD at PythonRDD.scala:53

In [107]:
rdd2 = sc.parallelize(data).reduceByKey(lambda x, y: x+y, 10) # 自定义并行度
rdd2

PythonRDD[207] at RDD at PythonRDD.scala:53

In [110]:
# 查看RDD的分区数
rdd1.getNumPartitions()

1

#### 数据分组

一般来说，内置的分组程序比手动分组要快，比如：

rdd.reduceByKey(func) 比 rdd.groupByKey().mapValues(value => value.reduce(func))快

#### 连接

leftOuterJoin(), rightOuterJoin() 和 join() # inner join

#### 排序

In [26]:
# sortByKey()
rdd = sc.parallelize({(1,0),(2,3),(5,3),(3,1),(0,4)})
rdd1 = rdd.sortByKey(ascending=False, numPartitions=None, keyfunc = lambda x: str(x))
rdd1.collect()

[(5, 3), (3, 1), (2, 3), (1, 0), (0, 4)]

In [29]:
# sortBy()
rdd = sc.parallelize({(1,0),(2,3),(5,3),(3,1),(0,4)})
rdd1 = rdd.sortBy(ascending=False, numPartitions=None, keyfunc=lambda x:x[1])
rdd1.collect()

[(0, 4), (2, 3), (5, 3), (3, 1), (1, 0)]

#### pair RDD的行动操作

In [30]:
rdd = sc.parallelize({(1,2), (3,4), (3,6)})
rdd.countByKey()

defaultdict(int, {1: 1, 3: 2})

In [31]:
rdd.collectAsMap() # 注意，转为map后会自动去重，取最新的记录

{1: 2, 3: 6}

In [32]:
rdd.lookup(3)

[4, 6]

### 文件读取和保存

#### 文本文件

##### 读取

In [38]:
_input = sc.textFile('../README.md')

##### 保存

In [37]:
rdd = sc.parallelize({('panda',0),('pink',3),('pirate',3),('panda',1),('pink',4)})
rdd.saveAsTextFile('../result/test') # path是一个文件夹，用于保存多个分区文件

#### JSON

##### 读取

In [56]:
import json
_input = sc.textFile('../data/json/pandainfo.json')
data = _input.map(lambda x: json.loads(x))

In [42]:
data.collect()

[{'name': 'Sparky The Bear', 'lovesPandas': True},
 {'name': 'Holden'},
 {'name': 'Sparky The Bear',
  'lovesPandas': True,
  'knows': {'friends': ['holden']}}]

##### 保存

In [60]:
data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x))#.saveAsTextFile('../data/json/love_pandas.json')

PythonRDD[93] at RDD at PythonRDD.scala:53

In [77]:
# 以下路径不能保存：和输入文件同一路径;路径下有其他内容的文件
data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x)).saveAsTextFile("../result/love_pandas/")

#### CSV

##### 读取

In [104]:
import csv
import io

def load_record(line):
    """解析一行CSV记录"""
    _input = io.StringIO(line)
    reader = csv.DictReader(_input, fieldnames=['name', 'favouriteAnimal'])
    return reader.next()
# load_record有错误，可能是新版本不兼容
_input = sc.textFile('../data/csv/favourite_animals.csv')

In [105]:
_input.collect()

['holden,panda', 'notholden,notpanda', 'spark,bear']

##### 保存

In [108]:
def writeRecords(records):
    """写出一些CSV记录"""
    output = os.StringIO()
    writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]

In [None]:
_input.mapPartitions(writeRecords).saveAsTextFile('../result/csv/love_pandas.csv')