In [1]:
!pip freeze | grep pyspark

pyspark==2.4.3


http://spark.apache.org/docs/latest/ml-pipeline.html

# 环境配置问题

## python中安装spark自带pyspark
使用pip install pyspark按照的pyspark版本可能和spark环境不兼容，为了保险起见，需要按照spark安装包中自带的pyspark

In [None]:
cd $SPARK_HOME/python/
python setup.py install

# spark-submit & spark-shell

## Using the spark-shell
在启动shell后，shell中已经自动创建以下对象
* SparkContext对象sc, rdd的核心对象
* SparkSession对象spark， DataFrame的核心对象

In [217]:
type(sc)

pyspark.context.SparkContext

In [218]:
sc

In [133]:
type(spark)

pyspark.sql.session.SparkSession

In [134]:
spark

In [None]:
$ ./bin/pyspark --master local[*] --driver-memory 4G --executor-memory 16G
# Or, to also add code.py to the search path (in order to later be able to import code), use:
$ ./bin/pyspark --master local[4] --py-files code.py

nohup pyspark2 --master local[*] --driver-memory 32G --executor-memory 32G &

### use the Jupyter notebook

In [None]:
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark

或者配置环境变量

In [None]:
vim /etc/bashrc

export PYSPARK_DRIVER_PYTHON=jupyter 
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip=* --port=1414 --allow-root --no-browser --NotebookApp.token=aaa --notebook-dir=/home/ian/code/pyspark"

source /etc/bashrc

参数解释：
* --ip=* ：指定允许访问的ip，这里允许所有ip访问
* --allow-root：允许root启动jupyter
* --no-browser：不启动浏览器
* --NotebookApp.token=aaa：指定token
* --notebook-dir=/home/ian/code/pyspark：指定jupyter根目录文件夹

### 常见问题

* Exception: Python in worker has different version 2.7 than that in driver 3.7  
这就需要在spark所有的节点上配置环境变量 PYSPARK_PYTHON=pythonpath
* Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 7, nn1.leapstack.cn, executor 1): java.io.IOException: Cannot run program "/root/.pyenv/shims/python": error=13, Permission denied  
这个问题的定位其实很简单，就是文件权限问题，但要注意的是不止是文件权限要改，包含他的文件夹的权限也要改。chmod a+x /root  在这里主要是/root文件夹一般没有+x权限


## 使用spark-submit提交.py文件

在命令行输入spark-submit wordcount.py，会在当前文件夹生成一个result文件夹。part-00000为运行结果。

In [None]:
"""
@file:wordcount.py
"""
from pyspark.sql import SparkSession
master = 'local'
spark = SparkSession.builder.appName('test').master(master).getOrCreate()
sc = spark.sparkContext
filename = 'f:/README.md'
logData = sc.textFile(filename)
wordsRDD = logData.flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
out_filename = 'result'
wordsRDD.saveAsTextFile(out_filename)
words = wordsRDD.collect()
spark.stop()

# RDD
Resilient Distributed Datasets, 弹性分布式数据集

* 从名字上其实就可以很容易的说明。首先它是一个dataset（数据的集合）;
* resilient弹性的，即数据集的大小可以改变；
* distributed分布式的，即数据集存储在分布式集群上。
* RDD是只读的
* RDD是一个逻辑概念，主要由Dependency、Partition(分区)、Partitioner(分区器)组成。
    * Partition记录了数据split的逻辑
    一个RDD又分成多个Partition（**注意这里的Partition也不是实际物理存储，如rdd1的3个Partition通过转换生成rdd2的4个Partition，那么rdd2的Partitions只会记录一下他的数据是通过rdd1的哪个partition通过什么转换得到的，而不会实际计算**）。**每个Task处理一个Partition**。
    * Dependency记录的是transformation操作过程中Partition的演化
    * Partitioner是shuffle过程中key重分区时的策略，即计算key决定k-v属于哪个分区。
    https://blog.csdn.net/u011564172/article/details/54667057
* 在spark中，多个job间是串行执行的，同一个job 的多个stage是串行执行，只有一个stage里的多个task是并行执行的。如果希望task并行执行，那么数据必须是能够并行访问的。每个task的执行的操作一样、计算一样，只是数据不一样。（一般情况下一个task又对应一个cpu core）

![](imgs/2850424-bd83ee9d357a2a79.png)

## RDD Dependency依赖
### Spark Job逻辑执行图
逻辑执行图描述的是job 的数据流
* job 会经过哪些transformation
* 中间生成哪些RDD
* RDD 之间的依赖关系

![](imgs/2850424-055e2f8200f9b269.png)

### 如何建立RDD间依赖
RDD 之间的数据依赖问题包含三部分
* RDD 本身的依赖关系。要生成的RDD（以后用RDD x 表示）是依赖一个parent RDD，还是多个parent RDDs？
    • rddC = rddA.map(func) // rddC依赖于rddA
    • rddC = rddA.join(rddB) // rddC同时依赖于rddA与rddB
* RDD x 中会有多少个partition ？
    • 默认partition数是所有父RDD的partition的最大值
    • 部分RDD的partition数可由用户指定，如ShuffledRDD
* RDD x 与其parent RDDs 中partition 之间是什么依赖关系？是依赖parent RDD 中一个还是多个partition？
    • RDD x 中每个partition 可以依赖于parent RDD 中一个或者多个partition
    • 这个依赖可以是完全依赖或者部分依赖

![](imgs/2850424-54a0159a0e0b0262.png)

## RDD Partition(分区)

### 分区的概念
分区是RDD内部并行计算的一个计算单元，RDD的数据集在逻辑上被划分为多个分片，每一个分片称为分区，分区的格式决定了并行计算的粒度，

__每个分区的数值计算都是在一个TASK任务中进行的，因此任务的个数，也是由RDD(准确来说是作业最后一个RDD)的分区数决定__。

### 分区的作用
* 增加并行计算
* 减少网络通信开销
Spark把key－value rdd通过key的hashcode进行分区，而且保证相同的key存储在同一个节点上，这样对改rdd进行key聚合时，就不需要shuffle过程，  
我们进行mapreduce计算的时候为什么要进行shuffle？，就是说mapreduce里面网络传输主要在shuffle阶段，shuffle的根本原因是相同的key存在不同的节点上，按key进行聚合的时候不得不进行shuffle。shuffle是非常影响网络的，它要把所有的数据混在一起走网络，然后它才能把相同的key走到一起。要进行shuffle是存储决定的。  
Spark从这个教训中得到启发，spark会把key进行分区，也就是key的hashcode进行分区，相同的key，hashcode肯定是一样的，所以它进行分区的时候100t的数据分成10分，每部分10个t，它能确保相同的key肯定在一个分区里面，而且它能保证存储的时候相同的key能够存在同一个节点上。比如一个rdd分成了100份，集群有10个节点，所以每个节点存10份，每一分称为每个分区，spark能保证相同的key存在同一个节点上，实际上相同的key存在同一个分区。  
key的分布不均决定了有的分区大有的分区小。没法分区保证完全相等，但它会保证在一个接近的范围。所以mapreduce里面做的某些工作里边，spark就不需要shuffle了，spark解决网络传输这块的根本原理就是这个。  
进行join的时候是两个表，不可能把两个表都分区好，通常情况下是把用的频繁的大表事先进行分区，小表进行关联它的时候小表进行shuffle过程。  
大表不需要shuffle。  
需要在工作节点间进行数据混洗的转换极大地受益于分区。这样的转换是  cogroup，groupWith，join，leftOuterJoin，rightOuterJoin，groupByKey，reduceByKey，combineByKey 和lookup。  
分区是可配置的，只要RDD是基于键值对的即可。

### 分区原则：
尽可能是得分区的个数等于集群核心数目

无论是本地模式、Standalone模式、YARN模式或Mesos模式，我们都可以通过spark.default.parallelism来配置其默认分区个数，若没有设置该值，则根据不同的集群环境确定该值

### 可以人为指定partition数

In [None]:
sc.textFile(name, minPartitions=None, use_unicode=True)
sc.parallelize(c, numSlices=None)

## Partitioner(分区器)
![](imgs/partitioner.png)

### 自定义Partitioner

In [207]:
def my_partitioner(s):
    return s%5

In [208]:
x = sc.parallelize(range(20), 5)

In [209]:
x.glom().collect()

[[0, 1, 2, 3],
 [4, 5, 6, 7],
 [8, 9, 10, 11],
 [12, 13, 14, 15],
 [16, 17, 18, 19]]

In [214]:
x1 = x.map(lambda s: (s,1)).partitionBy(5, my_partitioner).map(lambda s: s[0])
x1.glom().collect()

[[0, 5, 10, 15],
 [1, 6, 11, 16],
 [2, 7, 12, 17],
 [3, 8, 13, 18],
 [4, 9, 14, 19]]

In [215]:
x1 = x.map(lambda s: (s,1)).partitionBy(6, my_partitioner).map(lambda s: s[0])
x1.glom().collect()

[[0, 5, 10, 15],
 [1, 6, 11, 16],
 [2, 7, 12, 17],
 [3, 8, 13, 18],
 [4, 9, 14, 19],
 []]

## RDD创建

### 从外部文件创建
* 支持本地磁盘文件
* 支持整个目录、多文件、通配符
* 支持压缩文件
* 支持HDFS

In [135]:
sc = spark.sparkContext#通过sparkSession获取上下文

In [233]:
#通过读取本地文件的方式生成rdd
rdd = sc.textFile("data/kv1.txt")
rdd.collect()#查看读取的文件的内容, 文件的每一行会生成一个元素

['238\x01val_238', '86\x01val_86', '311\x01val_311']

In [140]:
#通过读取本地多个文件的方式生成rdd
rdd = sc.textFile("data/kv1.txt,data/people.txt")# 注意逗号后不能有空格
rdd.collect()#查看读取的文件的内容, 文件的每一行会生成一个元素

['238\x01val_238',
 '86\x01val_86',
 '311\x01val_311',
 'Michael, 29',
 'Andy, 30',
 'Justin, 19']

In [141]:
#整个目录、多文件、通配符
rdd = sc.textFile("data/*.txt")
rdd.collect()

['Michael, 29',
 'Andy, 30',
 'Justin, 19',
 '238\x01val_238',
 '86\x01val_86',
 '311\x01val_311']

In [None]:
#支持压缩文件
rdd = sc.textFile("f:/test.gz")
rdd.count()#查看rdd中包含的元素个数

### 集合并行化

#### 原生数据空间（本地非分布式空间）和分布式空间
![](imgs/WX20200302-115453.png)

In [128]:
sc = spark.sparkContext

In [127]:
x = [1,2,3,4,5,6]

In [129]:
rdd2=sc.parallelize(x)

In [130]:
rdd2.collect()

[1, 2, 3, 4, 5, 6]

In [131]:
type(rdd2)

pyspark.rdd.RDD

### 从父RDD生成子RDD

## RDD算子
https://www.iteblog.com/archives/1395.html#union

http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds

RDD中有两类算子：Transformation和Action。它们的区别：
* Transformation类型算子：输入为RDD，输出为RDD；而Action输入为RDD，输出为其它类型；
* Transformation类型时延迟执行的，而Action是立即执行的。

![](imgs\WX20200302-134715@2x.png)

### 延迟执行和立即执行：
指的是解释器执行到Transformation时并不会执行其语句，而只是简单地记录一下该操作，直到遇到Action时才会执行前面的Transformation中的操作。

## Transformations

### map
![](imgs/map.svg)

In [142]:
# sc = spark context, parallelize creates an RDD from the passed object
x = sc.parallelize([1,2,3])
y = x.map(lambda x: (x,x**2))
 
# collect copies RDD elements to a list on the driver
print(x.collect()) 
print(y.collect())

[1, 2, 3]
[(1, 1), (2, 4), (3, 9)]


### flatMap
![](imgs/flatMap.svg)

In [143]:
x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x, 100*x, x**2))
print(x.collect())
print(y.collect())

[1, 2, 3]
[1, 100, 1, 2, 200, 4, 3, 300, 9]


### mapPartitionsWithIndex
![](imgs/mapPartitionsWithIndex.svg)

In [145]:
# mapPartitionsWithIndex
x = sc.parallelize([1,2,3], 2)
def f(partitionIndex, iterator): yield (partitionIndex,sum(iterator))
y = x.mapPartitionsWithIndex(f)
 
# glom() flattens elements on the same partition
print(x.glom().collect())  
print(y.glom().collect())

[[1], [2, 3]]
[[(0, 1)], [(1, 5)]]


### groupByKey
![](imgs/groupByKey.svg)

In [163]:
# groupByKey
x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])
y = x.groupByKey()
y.collect()

[('B', <pyspark.resultiterable.ResultIterable at 0x11c2d94a8>),
 ('A', <pyspark.resultiterable.ResultIterable at 0x11c2d92e8>)]

In [164]:
[(j[0],[i for i in j[1]]) for j in y.collect()]

[('B', [5, 4]), ('A', [3, 2, 1])]

### reduceByKey
![](imgs/reduceByKey.svg)

In [165]:
x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
y = x.reduceByKey(lambda agg, obj: agg + obj)
print(x.collect())
print(y.collect())

[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
[('B', 3), ('A', 12)]


### filter
![](imgs/filter.svg)

In [166]:
# filter
x = sc.parallelize([1,2,3])
y = x.filter(lambda x: x%2 == 1)  # filters out even elements
print(x.collect())
print(y.collect())

[1, 2, 3]
[1, 3]


### glom
Return an RDD created by coalescing all elements within each partition
into a list.
![](imgs/glom.svg)

In [203]:
# glom
x = sc.parallelize(['C','B','A'], 2)
y = x.glom()
print(x.collect()) 
print(y.collect())

['C', 'B', 'A']
[['C'], ['B', 'A']]


### repartition
Return an RDD created by coalescing all elements within each partition
into a list.
![](imgs/repartition.svg)

In [204]:
x = sc.parallelize([1,2,3,4,5],2)
y = x.repartition(numPartitions=3)
print(x.glom().collect())
print(y.glom().collect())

[[1, 2], [3, 4, 5]]
[[], [1, 2], [3, 4, 5]]


### coalesce
Return an RDD created by coalescing all elements within each partition
into a list.
![](imgs/coalesce.svg)

In [205]:
x = sc.parallelize([1,2,3,4,5],2)
y = x.coalesce(numPartitions=1)
print(x.glom().collect())
print(y.glom().collect())

[[1, 2], [3, 4, 5]]
[[1, 2, 3, 4, 5]]


## Actions

### reduce
![](imgs/reduce.svg)

In [160]:
# reduce
x = sc.parallelize([1,2,3])
y = x.reduce(lambda obj, accumulated: obj + accumulated)  # computes a cumulative sum
print(x.collect())
print(y)

[1, 2, 3]
6


### collect
![](imgs/collect.svg)

In [157]:
# collect
x = sc.parallelize([1,2,3])
y = x.collect()
print(x)  # distributed
print(y)  # not distributed

ParallelCollectionRDD[395] at parallelize at PythonRDD.scala:195
[1, 2, 3]


### take
![](imgs/take.svg)

In [187]:
x = sc.parallelize([1,3,1,2,3])
y = x.take(num = 3)
print(x.collect())
print(y)

[1, 3, 1, 2, 3]
[1, 3, 1]


### first
![](imgs/first.svg)

In [191]:
x = sc.parallelize([1,3,1,2,3])
x.collect()

[1, 3, 1, 2, 3]

In [190]:
x.first()

1

In [189]:
x.take(1)

[1]

### foreach
Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.  
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.
![](imgs/foreach.svg)

In [156]:
x = sc.parallelize([1,2,3])
def f(el):
    '''side effect: append the current RDD elements to a file'''
    with open("data/foreachExample.txt", 'a+') as f1:
        print(el,file=f1)

y = x.foreach(f) # writes into foreachExample.txt
 
print(x.collect())
print(y) # foreach returns 'None'
# print the contents of foreachExample.txt
with open("data/foreachExample.txt", "r") as foreachExample:
    print (foreachExample.read())

[1, 2, 3]
None
3
2
1
2
3
1
2
3
1
1
3
2
3
2
1



### sum
Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.  
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.
![](imgs/sum.svg)

In [177]:
# sum
x = sc.parallelize([1,3,2])
y = x.sum()
print(x.collect())
print(y)

[1, 3, 2]
6


### reduce
![](imgs/reduce.svg)

In [193]:
x = sc.parallelize([1,2,3])
x.reduce(lambda a, b: a+b)

6

## RDD Persistence持久化
http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. 

When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). 

This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes. These levels are set by passing a StorageLevel object (Scala, Java, Python) to persist(). 


* rdd.persist(storageLevel=StorageLevel(False, True, False, False, 1))  
返回rdd本身
* cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory)

Storage Level
* MEMORY_ONLY
* MEMORY_AND_DISK
* ...

Note: In Python, stored objects will always be serialized with the Pickle library, so it does not matter whether you choose a serialized level. 

The available storage levels in Python include 
* MEMORY_ONLY, 
* MEMORY_ONLY_2, 
* MEMORY_AND_DISK, 
* MEMORY_AND_DISK_2, 
* DISK_ONLY, 
* and DISK_ONLY_2.

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

### Which Storage Level to Choose?

Spark’s storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:

    If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.

    If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala)

    Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.

    Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.


### Removing Data

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. 

If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

In [194]:
rdd.persist()

data/wordcount MapPartitionsRDD[421] at textFile at NativeMethodAccessorImpl.java:0

In [202]:
rdd.persist()

data/wordcount MapPartitionsRDD[421] at textFile at NativeMethodAccessorImpl.java:0

In [199]:
rdd.is_cached

True

In [200]:
rdd.unpersist()

data/wordcount MapPartitionsRDD[421] at textFile at NativeMethodAccessorImpl.java:0

In [201]:
rdd.is_cached

False

## RDD优化方法
* 提高数据本地化存储，减少网络传输
    * 计算和存储同节点。executor和hdfs的DataNode、HBASE的region server同节点
    * executor数目要合适
    * 适当增加数据副本数量
* 列式存储
    * 数据分析中，很多时候只需要读取几列，所以采用列式存储减少磁盘IO，增加效率；
    * 列式存储时，每列的数据类型是相同的，这样可以采用一些压缩策略，数据存储可以有很高的压缩比
    * 常见的列式存储格式：
        * ORC
         [Apache *ORC* • High-Performance Columnar Storage for Hadoop](https://orc.apache.org/)
        * parquet
        * hbase的Column Family也是列式存储
* filter后重分区
* 数据倾斜
任务的完成时间取决于最长的task的完成时间，任务是否能够完成也取决于task是否能够完成
产生数据倾斜的原因：某些key的value数量太多
数据倾斜解决方案：
* 预结算
* 调整并行度
* 广播小数据集
适用于一个大表，一个小表
* 对发生倾斜的RDD key增加随机前缀
* 如果少量的key发生倾斜，可以先过滤出一个单独的RDD，之后再合并

## RDD实例

### 实例1
自己准备数据，完成如下任务：
* 从身份证号中提取年龄
* 从身份证号中提取性别

In [None]:
sc = spark.sparkContext
rdd = sc.textFile("f:/idNO.txt")
#从身份证号中提取年龄
rdd.map(lambda x:2018-int(x[6:10])).collect()
[31, 35, 34, 33, 32, 71, 61, 51, 41, 21]
#从身份证号中提取性别
def gender(x):
    if x%2==0: return 'male'
    else: return 'female'
rdd.map(lambda x:gender(int(x[-2]))).collect()
['male', 'female', 'male', 'female', 'male', 'male', 'male', 'female', 'female', 'male']

### 词频统计

In [167]:
sc = spark.sparkContext
rdd = sc.textFile("data/wordcount")

In [168]:
rdd.count()

4

In [169]:
rdd.collect()

['1|The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures\ufeff.',
 '2|Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.',
 '3|Apache Storm is a free and open source distrib

In [178]:
import re

In [186]:
rdd1 = rdd.flatMap(lambda s: [(i.lower(), 1) for i in re.split(r'[\||\s+!\.,]', s) if i!=''])
rdd1.collect()

[('1', 1),
 ('the', 1),
 ('apache', 1),
 ('hadoop', 1),
 ('software', 1),
 ('library', 1),
 ('is', 1),
 ('a', 1),
 ('framework', 1),
 ('that', 1),
 ('allows', 1),
 ('for', 1),
 ('the', 1),
 ('distributed', 1),
 ('processing', 1),
 ('of', 1),
 ('large', 1),
 ('data', 1),
 ('sets', 1),
 ('across', 1),
 ('clusters', 1),
 ('of', 1),
 ('computers', 1),
 ('using', 1),
 ('simple', 1),
 ('programming', 1),
 ('models', 1),
 ('it', 1),
 ('is', 1),
 ('designed', 1),
 ('to', 1),
 ('scale', 1),
 ('up', 1),
 ('from', 1),
 ('single', 1),
 ('servers', 1),
 ('to', 1),
 ('thousands', 1),
 ('of', 1),
 ('machines', 1),
 ('each', 1),
 ('offering', 1),
 ('local', 1),
 ('computation', 1),
 ('and', 1),
 ('storage', 1),
 ('rather', 1),
 ('than', 1),
 ('rely', 1),
 ('on', 1),
 ('hardware', 1),
 ('to', 1),
 ('deliver', 1),
 ('high-availability', 1),
 ('the', 1),
 ('library', 1),
 ('itself', 1),
 ('is', 1),
 ('designed', 1),
 ('to', 1),
 ('detect', 1),
 ('and', 1),
 ('handle', 1),
 ('failures', 1),
 ('at', 1),
 (

In [185]:
rdd1.reduceByKey(lambda a, b: a+b).collect()

[('1', 1),
 ('hadoop', 2),
 ('library', 2),
 ('is', 8),
 ('framework', 1),
 ('of', 10),
 ('large', 1),
 ('sets', 1),
 ('clusters', 1),
 ('computers', 1),
 ('using', 1),
 ('simple', 1),
 ('programming', 2),
 ('models.', 1),
 ('designed', 2),
 ('single', 1),
 ('servers', 1),
 ('offering', 1),
 ('computation', 2),
 ('rather', 1),
 ('than', 1),
 ('rely', 1),
 ('hardware', 1),
 ('high-availability,', 1),
 ('handle', 1),
 ('at', 1),
 ('delivering', 1),
 ('service', 1),
 ('computers,', 1),
 ('may', 1),
 ('prone', 1),
 ('general-purpose', 1),
 ('provides', 1),
 ('high-level', 2),
 ('in', 2),
 ('java,', 1),
 ('python', 1),
 ('r,', 1),
 ('an', 1),
 ('optimized', 1),
 ('engine', 1),
 ('supports', 2),
 ('execution', 1),
 ('set', 1),
 ('tools', 1),
 ('processing,', 2),
 ('machine', 1),
 ('learning,', 1),
 ('graph', 1),
 ('storm', 3),
 ('free', 1),
 ('open', 1),
 ('source', 1),
 ('process', 1),
 ('unbounded', 1),
 ('streams', 1),
 ('used', 1),
 ('use!', 1),
 ('4', 1),
 ('below', 1),
 ('overview', 1)

# spark sql

## SPARK SQL前世今生
### HIVE  
hive的本质是把sql语句转换成map-reduce执行程序  
![](imgs/hive1.png)
在Spark SQL出现以前，Hive以HDFS为存储，以Map Reduce为执行引擎，同时提供Metastore, 再配合Hive的语法解析器和查询优化器，是事实上的SQL on Hadoop的标准解决方案。

注意，**Hive自身的东西是语法解析器和查询优化器，即下图中的Client部分**。
![](imgs/hive.png)

### SHARK
早期的Spark主要是提供类似于Map Reduce的计算功能。故一个非常自然的想法是，用Spark替换Map Reduce作为Hive的执行引擎。这就是Spark SQL的前身——Shark

Shark的一个直接的问题就是需要对Hive依赖太强，这样对于自身的升级和更新有影响。
![](imgs/shark.png)

### SPARK SQL
目前Spark SQL已完全与Hive分离，但与其兼容（SQLContext用来执行标准sql，HiveContext用来执行Hive Sql）。同时Metastore不再是必须项，且支持HDFS以外的数据源，如RDBMS，JSON文件或Parquet文件
![](imgs/spark.png)

## DataFrame

### 为什么SPARK要推出DF/DS的API?
虽然说spark和map-reduce相比，很多情况下都能做得好，但是并不是说，任何人随便写个程序都比别人写的map-reduce效率高，因为很多地方需要做优化。
这样就有了一个很自然的想法，提供DF/DS的api，前面的业务逻辑随便你怎么写，都能够在DF/DS内部做优化。不用用户去解决优化问题，而是框架本身来考虑优化问题。
下图展示SPARK做优化的过程，可以看出，最终优化的结果还是RDD！！！
![](imgs/df优化过程.png)

spark SQL做优化的流程：首先生成逻辑执行计划，然后对逻辑执行计划做优化，生成若干个物理执行计划，通过Cost Optimizer评估出一个最优的物理执行计划，然后生成真正执行的代码。
![](imgs/df1.png)

采用DataFrame API的一个好处是其是一个语言独立的API，由下图可以看出，无论是使用sql, r, python, java/scala，程序的性能都是差不多的，都远优于直接使用RDD API编写的程序。
使用DataFrame API的好处：就是之前如果使用RDD API，那么如果用java语言编写，会生成javaRDD，如果用python语言编写，会生成pythonRDD；而如果使用DataFrame API，则生成的底层RDD是一样的！

![](imgs/df2.png)

### DataFrame vs RDD

![](imgs/df3.png)

* RDD是一个java对象列表，并不知道对象内部的结构  
    行式存储，
    RDD<Person>虽以Person为类型，但Spark不了解Person的内部结构，如Person里面有哪些属性，属性的名称和类型是什么
    RDD强调不可变性，无状态，方便函数式编程。
    但在数据复用性上弱，如我们只需要通过map算子的lambda表达式修改person的一个属性，但是还是会生成新的Person对象，那么在一系列的rdd转换过程中就会生成大量的中间对象这样就造成了GC压力较大
    * 优点:
    1）编译时类型安全，编译时就能检查出类型错误。
    2）面向对象的编程风格，直接通过类名点的方式来操作数据。
    * 缺点:
    1）序列化和反序列化的性能开销，无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化。
    2）GC的性能开销，频繁的创建和销毁对象, 势必会增加GC。
    rdd是只读的。好处是这样就不需要考虑并发修改的问题，吞吐量就可以做大！

* DataFrame
    结构化存储，每一列都有类型，
    DataFrame等价于Dataset<Row>
    DataFrame包含数据结构信息，即schema
    DataFrame提供了详细的结构信息，使得Spark SQL可以清楚地知道该数据集包含哪些列，每列的名称和类型是什么
    每一行是有结构的，如Name是String型，Age是Int型，Height是Double型
    
DataFrame核心特征：  
* Schema : 包含了以ROW为单位的每行数据的列的信息； Spark通过Schema就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了。
* off-heap（堆外内存） : Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中, 当要操作数据时, 就直接操作off-heap内存。
* Tungsten：新的执行引擎；
* Catalyst：新的语法解析框架；
* 优点：  
    off-heap就像地盘, schema就像地图, Spark有地图又有自己地盘了, 就可以自己说了算了, 不再受JVM的限制, 也就不再收GC的困扰了，通过schema和off-heap, DataFrame解决了RDD的缺点。对比RDD提升计算效率、减少数据读取、底层计算优化；
    DataFrame 提供了比RDD更丰富的算子
* 缺点:  
    DataFrame解决了RDD的缺点, 但是却丢了RDD的优点。DataFrame不是类型安全的, API也不是面向对象风格的。

In [None]:
# API不是面向对象的
idAgeDF.filter(idAgeDF.col("age") > 25)
# 不会报错, DataFrame不是编译时类型安全的
idAgeDF.filter(idAgeDF.col("age") > "")

# SparkSession API

## Starting Point: SparkSession
http://spark.apache.org/docs/latest/sql-getting-started.html

The entry point into all functionality in Spark is the SparkSession class. To create a basic SparkSession, just use SparkSession.builder:

### creating SparkSession 对象

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# DataFrame API

Untyped Dataset Operations (aka DataFrame Operations)

DataFrames provide a domain-specific language for structured data manipulation in Scala, Java, Python and R.

As mentioned above, in Spark 2.0, __DataFrames are just Dataset of Rows in Scala and Java API__. These operations are also referred as “untyped transformations” in contrast to “typed transformations” come with strongly typed Scala/Java Datasets.

Here we include some basic examples of structured data processing using Datasets:

In Python, it’s possible to access a DataFrame’s columns either by attribute (df.age) or by indexing (df['age']). While the former is convenient for interactive data exploration, users are highly encouraged to use the latter form, which is future proof and won’t break with column names that are also attributes on the DataFrame class.



## Running SQL Queries Programmatically
A DataFrame can be operated on using relational transformations and can also be used to create a temporary view. 

Registering a DataFrame as a temporary view allows you to run SQL queries over its data. 


The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.

In [10]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



## Column API

Column相当于pandas中的Series

### Creating Columns

In [None]:
# 1. Select a column out of a DataFrame

df.colName
df["colName"]

# 2. Create from an expression
df.colName + 1
1 / df.colName

### desc() | asc()

### alias(str) | name(str)
name() is an alias for alias().

### getField(name)[source]

    An expression that gets a field by name in a StructField.

In [423]:
from pyspark.sql import Row

df = spark.createDataFrame([Row(r=Row(a=1, b="b"))])

df.show()

+------+
|     r|
+------+
|[1, b]|
+------+



In [424]:
df.select(df.r.getField("b").alias('bb')).show()

+---+
| bb|
+---+
|  b|
+---+



### getItem(key)[source]

    An expression that gets an item at position ordinal out of a list, or gets an item by key out of a dict.

In [413]:
df = spark.createDataFrame([([1, 2], {"key": "value"})], ["l", "d"])

df.select(df.l.getItem(0), df.d.getItem("key")).show()

+----+------+
|l[0]|d[key]|
+----+------+
|   1| value|
+----+------+



In [415]:
df.select(df.l[0].alias('dd'), df.d["key"]).show()

+---+------+
| dd|d[key]|
+---+------+
|  1| value|
+---+------+



### isNotNull() |  isNull()

In [4]:
from pyspark.sql import Row

df = spark.createDataFrame([Row(name='Tom', height=80), Row(name='Alice', height=None)])

df.filter(df.height.isNotNull()).show()

+------+----+
|height|name|
+------+----+
|    80| Tom|
+------+----+



In [6]:
df[df.height.isNotNull()].show()

+------+----+
|height|name|
+------+----+
|    80| Tom|
+------+----+



In [7]:
df[df.height.isNull()].show()

+------+-----+
|height| name|
+------+-----+
|  null|Alice|
+------+-----+



### isin(*cols)

In [417]:
df[df.name.isin("Bob", "Mike")].show()

+------+----+
|height|name|
+------+----+
+------+----+



In [8]:
df[df.height.isin(80)].show()

+------+----+
|height|name|
+------+----+
|    80| Tom|
+------+----+



In [9]:
# 注意 这里会把空字符串也过滤掉
df[df.height.isin(80)==False].show()

+------+----+
|height|name|
+------+----+
+------+----+



### str类

#### startswith(other) | endswith(other)

In [None]:
df.filter(df.name.endswith('ice')).collect()
[Row(age=2, name='Alice')]

df.filter(df.name.endswith('ice$')).collect()
[]

#### like(other) |  rlike(other)

    SQL like expression. Returns a boolean Column based on a SQL LIKE match.

In [418]:
df.filter(df.name.like('Al%')).collect()

[Row(height=None, name='Alice')]

In [420]:
df.filter(df.name.rlike('ice$')).collect()

[Row(height=None, name='Alice')]

In [421]:
df.filter(df.name.rlike(r'ice$')).collect()

[Row(height=None, name='Alice')]

#### substr(startPos, length)[source]

    Return a Column which is a substring of the column.

In [422]:
df.select(df.name.substr(1, 3).alias("col")).collect()

[Row(col='Tom'), Row(col='Ali')]

## Creating DataFrames

### 程序生成DataFrame

#### rdd

In [320]:
from pyspark.sql.types import *
from pyspark.sql import Row

In [323]:
# 表头
fields = [
    StructField('name', StringType(), nullable=True),
    StructField('age', IntegerType(), nullable=True),
]

In [324]:
schema = StructType(fields)

In [325]:
# 表中记录
lines = sc.textFile('data/people.txt').map(lambda s: [i.strip() for i in s.split(',')])

In [326]:
df = spark.createDataFrame(lines, schema)

In [327]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [None]:
spark.createDataFrame(
    data,
    schema=None,
    samplingRatio=None,
    verifySchema=True,
)

data: RDD, list, or pd.DataFrame

#### list

In [377]:
df = spark.createDataFrame([('a',1),
                            ('b',2),
                            ('c',3),
                            ('d',4)], ["words", "num"])
df.show()

+-----+---+
|words|num|
+-----+---+
|    a|  1|
|    b|  2|
|    c|  3|
|    d|  4|
+-----+---+



#### pd.DataFrame

In [10]:
import pandas as pd

In [11]:
dfp = pd.DataFrame({'a':[1, 2, 4], 'b': list('abc')})

In [12]:
spark.createDataFrame(dfp).show()

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2|  b|
|  4|  c|
+---+---+



In [13]:
# 或者稍微复杂的方式：先把dfp转换为list，在转换为spark df
spark.createDataFrame(dfp.values.tolist(), dfp.columns.tolist()).show()

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2|  b|
|  4|  c|
+---+---+



### Data Sources
Spark SQL supports operating on a variety of data sources through the DataFrame interface. 

This section describes the general methods for loading and saving data using the Spark Data Sources and then goes into specific options that are available for the built-in data sources.

从数据源读取数据，spark提供了集中方式：
* 通用方式 spark.read.load(path=None, format=None, schema=None, **options)
* 简便方式 
spark.read.{format}(*paths)

向数据源写入数据，spark也是提供了两种方式：
* 通用方式  
    df.write.save(
        path=None,
        format=None,
        mode=None,
        partitionBy=None,
        **options,
    )
* 简写形式
df.write.{format}(path, mode=None, partitionBy=None, compression=None)
    * path: 注意这里的path是文件夹的名字，在这个文件夹里会生成相应格式的文件。  
    * param mode: specifies the behavior of the save operation when data already exists.

        * ``append``: Append contents of this :class:`DataFrame` to existing data.
        * ``overwrite``: Overwrite existing data.
        * ``ignore``: Silently ignore this operation if data already exists.
        * ``error`` or ``errorifexists`` (default case): Throw an exception if data already exists.

### parquet
In the simplest form, the default data source (parquet unless otherwise configured by spark.sql.sources.default) will be used for all operations.

#### load

In [14]:
df = spark.read.load("data/users.parquet")

In [290]:
spark.read.parquet("data/users.parquet")

DataFrame[name: string, favorite_color: string, favorite_numbers: array<int>]

In [15]:
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



#### 分批量读取
有时候会碰到一种情况，数据量太大，一次读取后会造成OOM，需要分批读取文件夹中的parquet文件

方法：采用通配符

In [None]:
df = spark.read.parquet('file:///opt/dataProcessResult/hz/register_merge/part-000*.parquet')

#### save
:param mode: specifies the behavior of the save operation when data already exists.

    * ``append``: Append contents of this :class:`DataFrame` to existing data.  注: 使用append方式, spark会保证文件夹中命名不重复
    * ``overwrite``: Overwrite existing data.
    * ``ignore``: Silently ignore this operation if data already exists.
    * ``error`` or ``errorifexists`` (default case): Throw an exception if data already                 exists.

In [18]:
df.select("name", "favorite_color").write.save("data/namesAndFavColors.parquet", mode='append')

In [298]:
df.select("name", "favorite_color").write.csv("data/namesAndFavColors", mode='ignore')

### json

In [4]:
# spark is an existing SparkSession
df = spark.read.json("data/people.json")
# Displays the content of the DataFrame to stdout
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [303]:
df = spark.read.load("data/people.json", format="json")
# df.select("name", "age").write.save("data/namesAndAges.parquet", format="parquet")

In [304]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



### csv

In [223]:
df = spark.read.load("data/people.csv",
                     format="csv", sep=";", inferSchema="true", header="true")

In [249]:
df = spark.read.csv("data/people.csv", sep=";", inferSchema="true", header="true")

In [250]:
df.show()

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+



### spark sql连接mysql数据库
可以直接把mysql中的表读成DataFrame

#### 准备工作
下载mysql JDBC驱动程序jar包；
放到$SPARK_HOME/jars/下即可

In [333]:
# jdbcDF = spark.read\
#         .format('jdbc')\
#         .option('driver', 'com.mysql.jdbc.Driver')\
#         .option('url', 'jdbc:mysql://localhost:3306/dics')\
#         .option('dbtable', 'disease_dic')\
#         .option('user', 'root')\
#         .option('password', '123456').load()

### Run SQL on files directly

In [236]:
df = spark.sql("SELECT * FROM parquet.`data/users.parquet`")

df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



### rdd和DataFrame互相转换

#### DataFrame转换rdd
DataFrame的每一行是一个Row对象，作为rdd的一个元素

In [230]:
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [231]:
df.rdd.collect()

[Row(name='Alyssa', favorite_color=None, favorite_numbers=[3, 9, 15, 20]),
 Row(name='Ben', favorite_color='red', favorite_numbers=[])]

In [232]:
df.rdd.map(lambda s: s.name).collect()

['Alyssa', 'Ben']

#### rdd转DF

In [237]:
rdd = df.rdd

In [238]:
rdd.collect()

[Row(name='Alyssa', favorite_color=None, favorite_numbers=[3, 9, 15, 20]),
 Row(name='Ben', favorite_color='red', favorite_numbers=[])]

In [240]:
dfr = rdd.toDF()

In [243]:
dfr = spark.createDataFrame(rdd)

In [244]:
dfr.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [312]:
rdd = sc.textFile("data/people.txt").map(lambda s: s.split(','))
rdd.collect()

[['Michael', ' 29'],
 ['Andy', ' 30'],
 ['Justin', ' 19'],
 ['Michael', ' 29'],
 ['Andy1', ' 30'],
 ['Justin', ' 19']]

注意：由于没有指定schema，所以都是字符串类型

In [314]:
rdd.toDF().show()

+-------+---+
|     _1| _2|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
|Michael| 29|
|  Andy1| 30|
| Justin| 19|
+-------+---+



#### best practice

In [315]:
from pyspark.sql import Row

In [319]:
rdd1 = rdd.map(lambda s: Row(name=s[0], age=int(s[1].strip())))
rdd1.toDF().show()

+---+-------+
|age|   name|
+---+-------+
| 29|Michael|
| 30|   Andy|
| 19| Justin|
| 29|Michael|
| 30|  Andy1|
| 19| Justin|
+---+-------+



## df属性

### count()

In [259]:
df.count()

3

### dtypes

In [260]:
df.dtypes

[('age', 'bigint'), ('name', 'string')]

In [261]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



### columns

In [262]:
df.columns

['age', 'name']

## Running SQL Queries Programmatically
A DataFrame can be operated on using relational transformations and can also be used to create a temporary view. 

Registering a DataFrame as a temporary view allows you to run SQL queries over its data. 


The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.

In [10]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



## DataFrame Transformations

### 给DataFrame新增一列

#### 新增一个常数列

 pyspark.sql.functions.lit(col)

    Creates a Column of literal value.

In [25]:
from pyspark.sql.functions import lit

In [26]:
df.select(lit(5).alias('height')).withColumn('spark_user', lit(True)).show()

+------+----------+
|height|spark_user|
+------+----------+
|     5|      true|
|     5|      true|
|     5|      true|
+------+----------+



In [27]:
df = df.withColumn('height', lit(5))
df.show()

+----+-------+--------------------+------+
| age|   name|                   a|height|
+----+-------+--------------------+------+
|null|Michael|[M, i, c, h, a, e...|     5|
|  30|   Andy|      [A, n, d, y, ]|     5|
|  19| Justin|[J, u, s, t, i, n, ]|     5|
+----+-------+--------------------+------+



In [28]:
df = df.withColumn('height', lit('hello'))
df.show()

+----+-------+--------------------+------+
| age|   name|                   a|height|
+----+-------+--------------------+------+
|null|Michael|[M, i, c, h, a, e...| hello|
|  30|   Andy|      [A, n, d, y, ]| hello|
|  19| Justin|[J, u, s, t, i, n, ]| hello|
+----+-------+--------------------+------+



In [29]:
df = df.withColumn('height', lit(True))
df.show()

+----+-------+--------------------+------+
| age|   name|                   a|height|
+----+-------+--------------------+------+
|null|Michael|[M, i, c, h, a, e...|  true|
|  30|   Andy|      [A, n, d, y, ]|  true|
|  19| Justin|[J, u, s, t, i, n, ]|  true|
+----+-------+--------------------+------+



#### 基于已有列新增一列
如果列名不存在，则新增一列；

如果列名存在，则覆盖；

In [22]:
from pyspark.sql.functions import split

In [23]:
df = df.withColumn('a', split(df.name, ''))

In [24]:
df.show()

+----+-------+--------------------+
| age|   name|                   a|
+----+-------+--------------------+
|null|Michael|[M, i, c, h, a, e...|
|  30|   Andy|      [A, n, d, y, ]|
|  19| Justin|[J, u, s, t, i, n, ]|
+----+-------+--------------------+



### 删除列

In [370]:
df.show()

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
|  Ian| 33| Engineer|
+-----+---+---------+



In [371]:
df.drop('age').show()

+-----+---------+
| name|      job|
+-----+---------+
|Jorge|Developer|
|  Bob|Developer|
|  Ian| Engineer|
+-----+---------+



In [372]:
df.drop('age', 'job').show()

+-----+
| name|
+-----+
|Jorge|
|  Bob|
|  Ian|
+-----+



In [374]:
# Error: col should be a string or a Column
df.drop(['age', 'job']).show()

### 删除重复行
dropDuplicates(subset=None)

In [375]:
from pyspark.sql import Row
df = sc.parallelize([ \
    Row(name='Alice', age=5, height=80), \
    Row(name='Alice', age=5, height=80), \
    Row(name='Alice', age=10, height=80)]).toDF()
df.dropDuplicates().show()

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+



In [376]:
df.dropDuplicates(['name', 'height']).show()

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+



### withColumnRenamed(existing, new)

In [257]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [258]:
df.withColumnRenamed('age', 'age1').show()

+----+-------+
|age1|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



### 强制类型转换

In [263]:
df.dtypes

[('age', 'bigint'), ('name', 'string')]

In [264]:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType
df = df.withColumn('ages', col('age').cast(StringType()))

In [266]:
df.dtypes

[('age', 'bigint'), ('name', 'string'), ('ages', 'string')]

### df.dropna(how='any', thresh=None, subset=None)

In [306]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [307]:
df.dropna().show()

+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+



In [308]:
df.dropna(how='all').show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [309]:
df.age

Column<b'age'>

In [310]:
df['age']

Column<b'age'>

## Viewing Data & Selection

### printSchema()

In [220]:
# Print the schema in a tree format
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



### select(*cols)

In [283]:
df.select('_c0').show()

+-------+
|    _c0|
+-------+
|Michael|
|   Andy|
| Justin|
|Michael|
|  Andy1|
| Justin|
+-------+



In [301]:
df.select('_c0', '_c0', '_c1').show()

+-------+-------+---+
|    _c0|    _c0|_c1|
+-------+-------+---+
|Michael|Michael| 29|
|   Andy|   Andy| 30|
| Justin| Justin| 19|
|Michael|Michael| 29|
|  Andy1|  Andy1| 30|
| Justin| Justin| 19|
+-------+-------+---+



In [302]:
df.select(['_c0', '_c0', '_c1']).show()

+-------+-------+---+
|    _c0|    _c0|_c1|
+-------+-------+---+
|Michael|Michael| 29|
|   Andy|   Andy| 30|
| Justin| Justin| 19|
|Michael|Michael| 29|
|  Andy1|  Andy1| 30|
| Justin| Justin| 19|
+-------+-------+---+



In [284]:
df.select(df['_c0']).show()

+-------+
|    _c0|
+-------+
|Michael|
|   Andy|
| Justin|
|Michael|
|  Andy1|
| Justin|
+-------+



In [289]:
df.select(df['_c0']+'s').show()

+---------+
|(_c0 + s)|
+---------+
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
+---------+



In [287]:
df.select(df['_c0'], df['_c0'], df['_c1']).show()

+-------+-------+---+
|    _c0|    _c0|_c1|
+-------+-------+---+
|Michael|Michael| 29|
|   Andy|   Andy| 30|
| Justin| Justin| 19|
|Michael|Michael| 29|
|  Andy1|  Andy1| 30|
| Justin| Justin| 19|
+-------+-------+---+



### show(n=20, truncate=True, vertical=False)

In [221]:
# Select only the "name" column
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [222]:
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



### head(n=1)

In [343]:
df.head()

Row(name='Jorge', age=30, job='Developer')

In [345]:
df.head(2)

[Row(name='Jorge', age=30, job='Developer'),
 Row(name='Bob', age=32, job='Developer')]

### first()

In [344]:
df.first()

Row(name='Jorge', age=30, job='Developer')

### take(n)

In [346]:
df.take(2)

[Row(name='Jorge', age=30, job='Developer'),
 Row(name='Bob', age=32, job='Developer')]

### df.filter(condition)

In [254]:
# Select people older than 21
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



### df.sort(*cols, **kwargs)

In [366]:
df.sort('age').show()

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
|  Ian| 33| Engineer|
+-----+---+---------+



In [365]:
df.sort(df['age'].desc()).show()

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|  Ian| 33| Engineer|
|  Bob| 32|Developer|
|Jorge| 30|Developer|
+-----+---+---------+



In [256]:
df.sort(df['age'].desc(), df['name'].asc()).show()

+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  19| Justin|
|null|Michael|
+----+-------+



### distinct()

In [6]:
df = spark.read.csv('data/people.txt')

In [278]:
df.dtypes

[('_c0', 'string'), ('_c1', 'string')]

In [7]:
df.show()

+-------+---+
|    _c0|_c1|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
|Michael| 29|
|  Andy1| 30|
| Justin| 19|
+-------+---+



In [280]:
df.select('_c1').distinct().show()

+---+
|_c1|
+---+
| 29|
| 19|
| 30|
+---+



In [282]:
df1 = df.select('_c1','_c0')
df1.distinct().show()

+---+-------+
|_c1|    _c0|
+---+-------+
| 30|   Andy|
| 29|Michael|
| 30|  Andy1|
| 19| Justin|
+---+-------+



### df.limit(num)
取前n行，返回DataFrame

In [348]:
df.limit(2).show()

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+



In [402]:
df.show()

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+



### df.sample(withReplacement=None, fraction=None, seed=None)
This is not guaranteed to provide exactly the fraction specified of the total count of the given :class:`DataFrame`.

In [409]:
df.sample(fraction=0.4).show()

+---+----+
| id|   v|
+---+----+
|  1| 2.0|
|  2|10.0|
+---+----+



## 统计

### df.groupBy(*cols)

In [5]:
# Count people by age
df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



### df.agg(*exprs)
Aggregate on the entire :class:`DataFrame` without groups
(shorthand for ``df.groupBy.agg()``).

In [378]:
df.show()

+-----+---+
|words|num|
+-----+---+
|    a|  1|
|    b|  2|
|    c|  3|
|    d|  4|
+-----+---+



In [383]:
from pyspark.sql.functions import max as pyspark_max, avg
df.agg(pyspark_max("num"), avg("num")).show()

+--------+--------+
|max(num)|avg(num)|
+--------+--------+
|       4|     2.5|
+--------+--------+



In [385]:
df.groupBy().agg(pyspark_max("num"), avg("num")).show()

+--------+--------+
|max(num)|avg(num)|
+--------+--------+
|       4|     2.5|
+--------+--------+



In [384]:
df.agg({"num": "max", "num":"avg"}).show()

+--------+
|avg(num)|
+--------+
|     2.5|
+--------+



### dfg.apply(udf)
Docstring:
Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result
as a `DataFrame`.


In [8]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(

    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],

    ("id", "v"))
df.show()

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+



In [9]:
# !pip install PyArrow
# !pip freeze | grep pyarrow  # pyarrow==0.16.0

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)  
def normalize(pdf):

    v = pdf.v

    return pdf.assign(v=(v - v.mean()) / v.std())

In [10]:
dfg = df.groupby("id")
dfg.apply(normalize)

DataFrame[id: bigint, v: double]

## 合并

### 交集df.intersect(other)
Return a new :class:`DataFrame` containing rows only in
both this frame and another frame.

In [351]:
df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df1.show()

+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  b|  3|
|  c|  4|
+---+---+



In [352]:
df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
df2.show()

+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  b|  3|
+---+---+



In [354]:
df1.intersect(df2).sort("C1", "C2").show()

+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  b|  3|
+---+---+



### df1.intersectAll(other)
Return a new :class:`DataFrame` containing rows in both this dataframe and other
dataframe while preserving duplicates.

In [353]:
df1.intersectAll(df2).sort("C1", "C2").show()

+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  b|  3|
+---+---+



### 差集df1.exceptAll(other)
Return a new :class:`DataFrame` containing rows in this :class:`DataFrame` but
not in another :class:`DataFrame` while preserving duplicates.

This is equivalent to `EXCEPT ALL` in SQL.

In [355]:
df1.exceptAll(df2).show()

+---+---+
| C1| C2|
+---+---+
|  c|  4|
+---+---+



### df1.crossJoin(other)
Returns the cartesian product(笛卡尔乘积) with another :class:`DataFrame`.

In [356]:
df = spark.read.csv("data/people.csv", sep=";", inferSchema="true", header="true")
df.show()

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
|  Ian| 33| Engineer|
+-----+---+---------+



In [362]:
df1 = df.select('name', 'age')
df2 = df.select('name', 'job').limit(2)

In [342]:
df1.crossJoin(df2).show()

+-----+---+-----+---------+
| name|age| name|      job|
+-----+---+-----+---------+
|Jorge| 30|Jorge|Developer|
|  Bob| 32|Jorge|Developer|
|  Ian| 33|Jorge|Developer|
|Jorge| 30|  Bob|Developer|
|  Bob| 32|  Bob|Developer|
|  Ian| 33|  Bob|Developer|
|Jorge| 30|  Ian| Engineer|
|  Bob| 32|  Ian| Engineer|
|  Ian| 33|  Ian| Engineer|
+-----+---+-----+---------+



### df1.join(other, on=None, how=None)
* param on: 
    * a string for the join column name, 
    * a list of column names,
    * a join expression (Column), 
    * or a list of Columns.
    
If `on` is a string or a list of strings indicating the name of the join column(s),
    the column(s) must exist on both sides, and this performs an equi-join.

In [364]:
df1.join(df2, 'name', how='inner').show()

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+



In [363]:
df1.join(df2, df1.name==df2.name, how='inner').show()

+-----+---+-----+---------+
| name|age| name|      job|
+-----+---+-----+---------+
|Jorge| 30|Jorge|Developer|
|  Bob| 32|  Bob|Developer|
+-----+---+-----+---------+



### df.union(other)
Return a new :class:`DataFrame` containing union of rows in this and another frame.

注意：两个DataFrame要有相同的列数，否则报错！

In [367]:
df1.union(df2).show()

+-----+---------+
| name|      age|
+-----+---------+
|Jorge|       30|
|  Bob|       32|
|  Ian|       33|
|Jorge|Developer|
|  Bob|Developer|
+-----+---------+



In [369]:
df.union(df1).show()
#Py4JJavaError: An error occurred while calling o3619.union.
#: org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 3 columns and the second table has 2 columns;

## 内置函数pyspark.sql.functions

### str

#### split
Splits str around pattern (pattern is a regular expression).

In [14]:
from pyspark.sql.functions import split

In [5]:
df = spark.createDataFrame([('ab12cd',)], ['s',])
df.show()

+------+
|     s|
+------+
|ab12cd|
+------+



In [18]:
df.select(split(df.s, '[0-9]+')).show()

+----------------+
|split(s, [0-9]+)|
+----------------+
|        [ab, cd]|
+----------------+



In [19]:
df.select(split(df.s, '[0-9]+').alias('s')).show()

+--------+
|       s|
+--------+
|[ab, cd]|
+--------+



In [20]:
df.withColumn('x_list', split(df.s, '[0-9]+')).show()

+------+--------+
|     s|  x_list|
+------+--------+
|ab12cd|[ab, cd]|
+------+--------+



#### upper | lower

In [6]:
from pyspark.sql.functions import upper, lower

In [8]:
df.select(upper(df['s'])).show()

+--------+
|upper(s)|
+--------+
|  AB12CD|
+--------+



In [9]:
df.withColumn('s_upper', upper(df['s'])).show()

+------+-------+
|     s|s_upper|
+------+-------+
|ab12cd| AB12CD|
+------+-------+



## df持久化

In [12]:
from pyspark import StorageLevel

In [None]:
StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication=1)

In [19]:
df.persist(StorageLevel(True, True, False, False))

DataFrame[height: bigint, name: string]

In [20]:
df.is_cached

True

In [21]:
df.unpersist()

DataFrame[height: bigint, name: string]

In [22]:
df.is_cached

False

In [24]:
# df被赋值给其它变量，该怎么对之前的做unpersist? 可以利用其返回值
df = 1

In [23]:
dfp = df.persist(StorageLevel(True, True, False, False))

In [25]:
df.unpersist()

AttributeError: 'int' object has no attribute 'unpersist'

In [26]:
dfp.unpersist()

DataFrame[height: bigint, name: string]

In [27]:
dfp.is_cached

False

In [16]:
StorageLevel.MEMORY_AND_DISK_2

StorageLevel(True, True, False, False, 2)

In [15]:
df.persist(StorageLevel.MEMORY_AND_DISK_2)

DataFrame[height: bigint, name: string]

In [None]:
df.cache()  # the default storage level (C{MEMORY_AND_DISK}).

## 综合实例：疾病对码

In [39]:
def ill_align(city):
    """
    分别给各个城市疾病对码
    """
    print(city)
    df = spark.read.parquet(f'file://{os.path.join(data_dir, city, "register")}')
    cols = 'out_hosp_diagnosiscode out_hosp_diagnosisname'
    # 提取名称中第一个疾病
    print('提取名称中第一个疾病')                        
    from pyspark.sql.functions import split
    split_pattern = r'[，,\^\s+，、;\.]'
    df = df.withColumn('out_hosp_diagnosisname_first', split(df.out_hosp_diagnosisname, split_pattern).getItem(0))                        
    print(f'df.count: {df.count()}')
    # 以疾病名称为主对               
    print('以疾病名称为主对')                    
    df1 = df.join(df_base, df['out_hosp_diagnosisname_first']==df_base['疾病名称'])                      
    print(f'df1.count: {df1.count()}')                   
    from pyspark.sql.functions import lit
    df1 = df1.withColumn('sim', lit(999))     
    df1.write.parquet(f"file://{os.path.join(data_dir, city, 'disease_align_name')}")                       
    # 剩下的以代码为主对        
    print('剩下的以代码为主对')                           
    dft = df[df['out_hosp_diagnosisname_first'].isin(list_df_base_ill)==False]                        
    # 标准化疾病代码    
    print('标准化疾病代码')                               
    from pyspark.sql.functions import upper
    dft = dft.withColumn('out_hosp_diagnosiscode_upper', upper(df['out_hosp_diagnosiscode']))                        
    df2 = dft.join(df_base, dft['out_hosp_diagnosiscode_upper']==df_base['主要编码'])                     
    print(f'df2.count: {df2.count()}')                                         
    df2 = df2.withColumn('sim', lit(999))
    df2 = df2.drop('out_hosp_diagnosiscode_upper')                        
    df2.write.parquet(f"file://{os.path.join(data_dir, city, 'disease_align_code')}")
    ll_df2_code = df2.select('out_hosp_diagnosiscode').rdd.map(lambda s: s.out_hosp_diagnosiscode).collect()

    ll_df2_code = list(set(ll_df2_code))                  
    # 剩下的对疾病名称进行TFIDF匹配
    print('剩下的对疾病名称进行TFIDF匹配')           
    dft = dft[dft['out_hosp_diagnosiscode'].isin(ll_df2_code)==False]                  
    from myutils import NLPDataPrepareWrapper as npw

    # 先取出所有未匹配的name
    print('先取出所有未匹配的name')           
    ll_dft_name = dft.select(dft['out_hosp_diagnosisname']).distinct().rdd.map(lambda s: s['out_hosp_diagnosisname']).collect()                  
    df_target = pd.DataFrame({'name': ll_dft_name})                  
    num = df_target.shape[0]//5000+1
    dfr = pd.DataFrame()
    for i in range(num):
        c = i*5000
        dftt = npw.match(df_base_pd, '疾病名称', df_target.iloc[c:(c+5000)], 'name', 'aa')
        if dfr.empty:
            dfr = dftt
        else:
            dfr = pd.concat([dfr, dftt])  
    dfr = dfr[['name', '疾病名称', '主要编码','sim']]   
    dfr.loc[:, 'name'] = dfr.name.map(str)

    dfr.loc[:, '疾病名称'] = dfr.疾病名称.map(str)

    dfr.loc[:, '主要编码'] = dfr.主要编码.map(str)

    dfr_spark = spark.createDataFrame(dfr)

    del dfr                  
    df3 = dft.join(dfr_spark, dft['out_hosp_diagnosisname']==dfr_spark['name'])                  
    df3 = df3.drop('name')

    df3 = df3.drop('out_hosp_diagnosiscode_upper')                    
    print(f'df3.count: {df3.count()}')                   
    print('df3写入')                  
    df3.write.parquet(f"file://{os.path.join(data_dir, city, 'disease_align_tfidf')}")                  
    print('complete!')                  

# sparkml API

## 生成SparkSession实例

In [3]:
spark = SparkSession.builder \
     .master("local[*]") \
     .appName("Word Count") \
     .config("spark.some.config.option", "some-value") \
     .getOrCreate()

## pyspark.ml.feature module
封装了特征工程的函数

### 分词Tokenizer

In [None]:
from pyspark.ml.feature import RegexTokenizer
rtokenizer = RegexTokenizer(inputCol='text', outputCol='words')
rtokenizer.setPattern('[.,\s]+')
df3 = rtokenizer.transform(df1)
df3.show(5)

### 去除停用词

In [119]:
from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol="words", outputCol="rwords", stopWords=StopWordsRemover.loadDefaultStopWords('english'))
df4 = remover.transform(df3)
df4.show(2)

+-------+-------+-----+--------------------+--------------------+-------+--------+--------------------+--------------------+--------------------+
|doc_id1|doc_id2|soure|                name|            doc_name|country|typename|                text|               words|              rwords|
+-------+-------+-----+--------------------+--------------------+-------+--------+--------------------+--------------------+--------------------+
|1494604| 549745|    7|Curriculum Vitae-...|curriculum.vitae....|     en|Business|My professional r...|[my, professional...|[professional, re...|
|1494602| 604435|    7|Five Questions (S...|com.kevinhecker.t...|     en|  Puzzle|Five Questions is...|[five, questions,...|[five, questions,...|
+-------+-------+-----+--------------------+--------------------+-------+--------+--------------------+--------------------+--------------------+
only showing top 2 rows



### pyspark.ml.feature.HashingTF
使用哈希技巧将一个术语序列映射到它们的术语频率。目前我们使用Austin Appleby的MurmurHash3算法(MurmurHash3 x86 32)来计算术语对象的哈希码值。由于使用简单的模将哈希函数转换为列索引，所以建议使用2的幂作为numFeatures参数;否则，特性将不会均匀地映射到列。

In [31]:
from pyspark.ml.feature import HashingTF

In [86]:
df = spark.createDataFrame([(["a", "b", "c"],),
                            (["d", "b", "c"],),
                            (["d", "b", "c","d"],),
                            (["a", "b", "c","d"],)], ["words"])

hashingTF = HashingTF(numFeatures=10, inputCol="words", outputCol="tf")

df = hashingTF.transform(df)

结果SparseVector(稀疏向量，是一个三元组)表示，
* 第一个元素是numFeatures，
* 第二个元素是一个数组，指是出现了哪几个索引
* 第三个元素是一个数组，和第二个元素的数据等长，指对于的索引位置出现的次数

In [87]:
df.show(truncate=False)

+------------+--------------------------------+
|words       |tf                              |
+------------+--------------------------------+
|[a, b, c]   |(10,[0,1,2],[1.0,1.0,1.0])      |
|[d, b, c]   |(10,[1,2,4],[1.0,1.0,1.0])      |
|[d, b, c, d]|(10,[1,2,4],[1.0,1.0,2.0])      |
|[a, b, c, d]|(10,[0,1,2,4],[1.0,1.0,1.0,1.0])|
+------------+--------------------------------+



### pyspark.ml.feature.IDF
Compute the Inverse Document Frequency (IDF) given a collection of documents.

https://blog.csdn.net/q1w2e3r4470/article/details/50534336

In [88]:
from pyspark.ml.feature import IDF

In [89]:
idf = IDF(minDocFreq=1, inputCol="tf", outputCol="tfidf")

model = idf.fit(df)

In [91]:
# # 构建IDF模型，训练集和测试集都用它
df = model.transform(df)
df.show(truncate=False)

+------------+--------------------------------+---------------------------------------------------------------+
|words       |tf                              |tfidf                                                          |
+------------+--------------------------------+---------------------------------------------------------------+
|[a, b, c]   |(10,[0,1,2],[1.0,1.0,1.0])      |(10,[0,1,2],[0.5108256237659907,0.0,0.0])                      |
|[d, b, c]   |(10,[1,2,4],[1.0,1.0,1.0])      |(10,[1,2,4],[0.0,0.0,0.22314355131420976])                     |
|[d, b, c, d]|(10,[1,2,4],[1.0,1.0,2.0])      |(10,[1,2,4],[0.0,0.0,0.44628710262841953])                     |
|[a, b, c, d]|(10,[0,1,2,4],[1.0,1.0,1.0,1.0])|(10,[0,1,2,4],[0.5108256237659907,0.0,0.0,0.22314355131420976])|
+------------+--------------------------------+---------------------------------------------------------------+



注意观察，在3和4样本中的d，可以看到3样本中的d赋值是4样本中d的两倍，显然考虑了词频，所以结果为tfidf！

### pyspark.ml.feature.Normalizer
L2_norm

In [92]:
from pyspark.ml.feature import Normalizer

In [97]:
normalizer = Normalizer(p=2.0, inputCol="tfidf", outputCol="tfidf_norm")
df = normalizer.transform(df)
df.select("tfidf tfidf_norm".split()).show(truncate=False)

+---------------------------------------------------------------+---------------------------------------------------------------+
|tfidf                                                          |tfidf_norm                                                     |
+---------------------------------------------------------------+---------------------------------------------------------------+
|(10,[0,1,2],[0.5108256237659907,0.0,0.0])                      |(10,[0,1,2],[1.0,0.0,0.0])                                     |
|(10,[1,2,4],[0.0,0.0,0.22314355131420976])                     |(10,[1,2,4],[0.0,0.0,1.0])                                     |
|(10,[1,2,4],[0.0,0.0,0.44628710262841953])                     |(10,[1,2,4],[0.0,0.0,1.0])                                     |
|(10,[0,1,2,4],[0.5108256237659907,0.0,0.0,0.22314355131420976])|(10,[0,1,2,4],[0.9163829172606391,0.0,0.0,0.40030282156497543])|
+---------------------------------------------------------------+-------------------------

## pyspark.ml.linalg module
MLlib utilities for linear algebra. For dense vectors, MLlib uses the NumPy array type, so you can simply pass NumPy arrays around. For sparse vectors, users can construct a SparseVector object from MLlib or pass SciPy scipy.sparse column vectors if SciPy is available in their environment.

In [98]:
df.printSchema()

root
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tf: vector (nullable = true)
 |-- tfidf: vector (nullable = true)
 |-- tfidf_norm: vector (nullable = true)



In [113]:
rdd = df.select('tfidf_norm').rdd.map(lambda x:x[0])

In [114]:
rdd.take(5)

[SparseVector(10, {0: 1.0, 1: 0.0, 2: 0.0}),
 SparseVector(10, {1: 0.0, 2: 0.0, 4: 1.0}),
 SparseVector(10, {1: 0.0, 2: 0.0, 4: 1.0}),
 SparseVector(10, {0: 0.9164, 1: 0.0, 2: 0.0, 4: 0.4003})]

In [120]:
from pyspark.ml.linalg import SparseVector, Vectors

In [123]:
base = [SparseVector(10, {0: 1.0, 1: 0.0, 2: 0.0}), SparseVector(10, {0: 2.0, 1: 0.0, 2: 0.0})]

In [125]:
rdd1 = rdd.map(lambda x: [x.dot(i) for i in base])
rdd1.take(5)

[[1.0, 2.0], [0.0, 0.0], [0.0, 0.0], [0.9163829172606391, 1.8327658345212783]]

In [126]:
rdd2 = rdd1.map(lambda x: x.index(max(x)))
rdd2.take(5)

[1, 0, 0, 1]

In [121]:
a = Vectors.sparse(4, [(0, 1), (3, 4)])

In [122]:
a.toArray()

array([1., 0., 0., 4.])

# examples & api

## sparkmlhw01  词频统计
采用Datasets操作，实现WordCount实例，并且按照count值降序显示前50行数据，其中word转换成小写，去除标点符号，去除停用词，考查点：  
1）  spark读取文件  
2）  dataset转换操作、聚合操作  
重点在数据清洗，转成小写，去除标点、停用词等，这里需要自己自定义停用词集合 和 标点符号集合  
3）  dataset排序及显示  

In [12]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [14]:
df1 = spark.read.csv("file:///Users/luoyonggui/PycharmProjects/mayiexamples/pyspark/data/wordcount", sep='|', header=False)
type(df1)

pyspark.sql.dataframe.DataFrame

In [15]:
df1.show(5)

+---+--------------------+
|_c0|                 _c1|
+---+--------------------+
|  1|The Apache Hadoop...|
|  2|Apache Spark is a...|
|  3|Apache Storm is a...|
|  4|Below is a high-l...|
+---+--------------------+



In [16]:
df1 = df1.withColumnRenamed('_c0','index').withColumnRenamed('_c1','content')

In [17]:
df1.dtypes

[('index', 'string'), ('content', 'string')]

In [18]:
df2 = df1.select('content')

In [19]:
df2.show(5)

+--------------------+
|             content|
+--------------------+
|The Apache Hadoop...|
|Apache Spark is a...|
|Apache Storm is a...|
|Below is a high-l...|
+--------------------+



In [20]:
df2.rdd.take(1)

[Row(content='The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures\ufeff.')]

In [21]:
rdd1 = df2.rdd.flatMap(lambda x:x.content.split())

In [22]:
rdd1.first()

'The'

In [23]:
rdd1.take(5)

['The', 'Apache', 'Hadoop', 'software', 'library']

In [24]:
rdd2 = rdd1.map(lambda x:x.lower())\
    .filter(lambda x: x not in list(",.:!';"))\
    .filter(lambda x: x not in ["the","of","a","to","that","it","for","is","and"])\
    .map(lambda x:(x,1))
rdd2.take(5)

[('apache', 1),
 ('hadoop', 1),
 ('software', 1),
 ('library', 1),
 ('framework', 1)]

In [25]:
# rdd3 = rdd2.reduceByKey(lambda a,b:a+b)
# rdd3.take(5)

In [26]:
from operator import add
rdd3 = rdd2.reduceByKey(add)
rdd3.take(5)

[('apache', 4),
 ('hadoop', 2),
 ('software', 1),
 ('library', 2),
 ('framework', 1)]

In [27]:
df3 = rdd3.toDF()
df3.show(5)

+---------+---+
|       _1| _2|
+---------+---+
|   apache|  4|
|   hadoop|  2|
| software|  1|
|  library|  2|
|framework|  1|
+---------+---+
only showing top 5 rows



In [28]:
df3.sort(df3['_2'].desc()).show()

+-----------+---+
|         _1| _2|
+-----------+---+
|     apache|  4|
|      storm|  3|
|      spark|  3|
|     hadoop|  2|
|    library|  2|
|distributed|  2|
|       data|  2|
|   designed|  2|
|computation|  2|
| processing|  2|
|    system.|  2|
|programming|  2|
|        sql|  2|
|       each|  2|
|         on|  2|
|    cluster|  2|
|         be|  2|
| high-level|  2|
|         in|  2|
|   supports|  2|
+-----------+---+
only showing top 20 rows



## sparkmlhw02
采用ML Pipelines构建一个文档分类器，需要将模型进行保存，并且加载模型后对测试样本进行预测，考查点：

1）  spark读取文件

2）  数据清洗，考查Datasets的基本操作

3）  构建分类器的管道，考查构建各种转换操作

4）  读取模型，读取测试数据，并且进行模型测试

 

数据格式：

myapp_id|typenameid|typename|myapp_word|myapp_word_all

 

其中文档ID字段为：myapp_id

其中文档类别字段为：typenameid

其中文档内容为：myapp_word_all

In [21]:
df1 = spark.read.csv("file:///Users/luoyonggui/PycharmProjects/mayiexamples/sparkml/data/doc_class.dat", sep='|', header=True)
df1.show(5)

+--------+----------+--------+--------------------+--------------------+
|myapp_id|typenameid|typename|          myapp_word|      myapp_word_all|
+--------+----------+--------+--------------------+--------------------+
| 1376533|         2|  action|game, android, world|game, android, wo...|
| 1376542|         2|  action|                game|game, app, enjoy,...|
| 1376603|         2|  action|run, tap, collect...|run, tap, collect...|
| 1376792|         2|  action|                 run|run, ath, game, m...|
| 1376941|         2|  action|fight, game, play...|fight, game, play...|
+--------+----------+--------+--------------------+--------------------+
only showing top 5 rows



In [22]:
df1.count()

334500

In [23]:
df1.dtypes

[('myapp_id', 'string'),
 ('typenameid', 'string'),
 ('typename', 'string'),
 ('myapp_word', 'string'),
 ('myapp_word_all', 'string')]

In [24]:
df1.columns

['myapp_id', 'typenameid', 'typename', 'myapp_word', 'myapp_word_all']

### checkpoint()

In [25]:
spark.sparkContext.setCheckpointDir('CheckpointDir/tt123')

In [26]:
df1.checkpoint()

DataFrame[myapp_id: string, typenameid: string, typename: string, myapp_word: string, myapp_word_all: string]

In [27]:
df1 = df1.withColumnRenamed('typenameid','label')
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
df1 = df1.withColumn('label', col('label').cast(IntegerType()))

In [28]:
df1.dtypes

[('myapp_id', 'string'),
 ('label', 'int'),
 ('typename', 'string'),
 ('myapp_word', 'string'),
 ('myapp_word_all', 'string')]

### 切分训练集和测试集,会先打乱数据集

In [29]:
#切分训练集和测试集,会先打乱数据集
train_set, test_set = df1.randomSplit([0.9,0.1])

In [30]:
train_set.count()

301005

In [31]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.ml.linalg import Vector

### 模型训练Pipeline  LogisticRegression HashingTF, Tokenizer

In [36]:
from time import time
start = time()
tokenizer = Tokenizer(inputCol='myapp_word_all', outputCol='words')
hashingTF = HashingTF(numFeatures=1000, inputCol='words', outputCol='features')

lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(train_set)
print(f'{time()-start}s')

17.490482807159424s


### 模型预测

In [37]:
p = model.transform(test_set)
p.select("myapp_id","label","typename","probability","prediction").show()

+--------+-----+-----------------+--------------------+----------+
|myapp_id|label|         typename|         probability|prediction|
+--------+-----+-----------------+--------------------+----------+
| 1376501|    4|           arcade|[4.11158459184166...|       4.0|
| 1376523|   14|           casual|[2.94084062076931...|      14.0|
| 1376606|    4|           arcade|[6.08513932347101...|       4.0|
| 1376635|    4|           arcade|[6.08513932347101...|       4.0|
| 1376656|   16|    communication|[1.78901353628456...|      16.0|
| 1376782|   14|           casual|[1.46631980338166...|      14.0|
| 1377005|   16|    communication|[1.78901353628456...|      16.0|
| 1377059|    4|           arcade|[6.08513932347101...|       4.0|
| 1377153|   16|    communication|[1.78901353628456...|      16.0|
| 1377192|    7|books & reference|[1.21296495436360...|       7.0|
| 1377195|    4|           arcade|[5.37814702677680...|       4.0|
| 1377499|   16|    communication|[1.78901353628456...|      1

### 模型保存和加载

In [38]:
#模型保存和加载
model.save('file:///tmp/testModel')

In [39]:
!ls /tmp/testModel

[1m[36mmetadata[m[m [1m[36mstages[m[m


In [40]:
! open /tmp/testModel

In [41]:
savedModel = PipelineModel.load('file:///tmp/testModel')

In [42]:
p = savedModel.transform(test_set)
p.select("myapp_id","label","typename","probability","prediction").show(5)

+--------+-----+-------------+--------------------+----------+
|myapp_id|label|     typename|         probability|prediction|
+--------+-----+-------------+--------------------+----------+
| 1376501|    4|       arcade|[4.11158459184166...|       4.0|
| 1376523|   14|       casual|[2.94084062076931...|      14.0|
| 1376606|    4|       arcade|[6.08513932347101...|       4.0|
| 1376635|    4|       arcade|[6.08513932347101...|       4.0|
| 1376656|   16|communication|[1.78901353628456...|      16.0|
+--------+-----+-------------+--------------------+----------+
only showing top 5 rows



### pipeline保存和加载

In [43]:
pipeline.save('file:///tmp/pp')

In [46]:
!ls /tmp/pp

[1m[36mmetadata[m[m [1m[36mstages[m[m


In [48]:
pp = Pipeline.load('file:///tmp/pp')

## sparkmlhw03
第3次课的作业和第2次的课一样，只是需要采用交叉验证的方法来做，并且还要计算各种分类指标，这里要分2次实验，第1次设计1个2元分类器，第2次设计1个多元分类器。

 

采用ML Pipelines构建一个文档分类器，需要将模型进行保存，并且加载模型后对测试样本进行预测，考查点：

1）  spark读取文件

2）  数据清洗，考查Datasets的基本操作

3）  构建分类器的管道，考查构建各种转换操作

4）  读取模型，读取测试数据，并且进行模型测试

5）  重点：自己设置交叉验证的网格参数，采用交叉验证的模型来做

6）  重点：计算分类结果的指标

 

数据格式：

myapp_id|typenameid|typename|myapp_word|myapp_word_all

 

其中文档ID字段为：myapp_id

其中文档类别字段为：typenameid

其中文档内容为：myapp_word_all

In [51]:
df1 = spark.read.csv("file:///Users/luoyonggui/PycharmProjects/mayiexamples/sparkml/data/doc_class.dat", sep='|', header=True)
df1.show(5)

+--------+----------+--------+--------------------+--------------------+
|myapp_id|typenameid|typename|          myapp_word|      myapp_word_all|
+--------+----------+--------+--------------------+--------------------+
| 1376533|         2|  action|game, android, world|game, android, wo...|
| 1376542|         2|  action|                game|game, app, enjoy,...|
| 1376603|         2|  action|run, tap, collect...|run, tap, collect...|
| 1376792|         2|  action|                 run|run, ath, game, m...|
| 1376941|         2|  action|fight, game, play...|fight, game, play...|
+--------+----------+--------+--------------------+--------------------+
only showing top 5 rows



In [57]:
df1.collect()[:5]

[Row(myapp_id='1376533', typenameid='2', typename='action', myapp_word='game, android, world', myapp_word_all='game, android, world, control, devic, experi, free, gameplay, play, screen, time, touch, war, action, addict, app, ath, attack, battl, challeng, collect, complet, descript, easi, enemi, enjoy, featur, fight, find, friend, fun, gamec, graphic, great, gun, high, kill, level, make, mission, mode, move, player, power, read, real, run, score, shoot, shooter, simpl, skill, sound, special, surviv, tap, uniqu, upgrad, weapon, zombi'),
 Row(myapp_id='1376542', typenameid='2', typename='action', myapp_word='game', myapp_word_all='game, app, enjoy, free, high, play, run, action, addict, android, ath, attack, battl, challeng, collect, complet, control, descript, devic, easi, enemi, experi, featur, fight, find, friend, fun, gamec, gameplay, graphic, great, gun, kill, level, make, mission, mode, move, player, power, read, real, score, screen, shoot, shooter, simpl, skill, sound, special, su

In [52]:
df1.select('typenameid').distinct().count()

46

### groupby()

In [58]:
df1.groupBy('typenameid').agg({'myapp_id': 'count'}).collect()

[Row(typenameid='7', count(myapp_id)=13122),
 Row(typenameid='15', count(myapp_id)=1785),
 Row(typenameid='11', count(myapp_id)=3828),
 Row(typenameid='29', count(myapp_id)=11588),
 Row(typenameid='42', count(myapp_id)=1439),
 Row(typenameid='3', count(myapp_id)=2181),
 Row(typenameid='30', count(myapp_id)=10974),
 Row(typenameid='34', count(myapp_id)=15295),
 Row(typenameid='8', count(myapp_id)=245),
 Row(typenameid='22', count(myapp_id)=10084),
 Row(typenameid='28', count(myapp_id)=587),
 Row(typenameid='16', count(myapp_id)=9981),
 Row(typenameid='35', count(myapp_id)=3148),
 Row(typenameid='47', count(myapp_id)=2582),
 Row(typenameid='43', count(myapp_id)=26346),
 Row(typenameid='5', count(myapp_id)=213),
 Row(typenameid='31', count(myapp_id)=17319),
 Row(typenameid='18', count(myapp_id)=2782),
 Row(typenameid='27', count(myapp_id)=5766),
 Row(typenameid='17', count(myapp_id)=17553),
 Row(typenameid='26', count(myapp_id)=6107),
 Row(typenameid='46', count(myapp_id)=1444),
 Row(type

In [53]:
tokenizer = Tokenizer(inputCol='myapp_word_all', outputCol='words')
hashingTF = HashingTF(inputCol='words', outputCol='features')
df2 = df1.withColumnRenamed('typenameid','label').withColumn('label', col('label').cast(IntegerType()))
#切分训练集和测试集,会先打乱数据集
train_set, test_set = df2.randomSplit([0.9,0.1])
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

### 网格调参grid

In [59]:
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures,[10, 100, 1000]).addGrid(lr.regParam, [0.1, 0.01]).build()

### 二元分类

In [60]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()

### 交叉验证

In [61]:
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=2)

In [62]:
start = time()
cvModel = cv.fit(train_set)
print(time()-start)

175.16829204559326


In [64]:
p = cvModel.transform(test_set)
p.select(['label', 'rawPrediction', 'probability', 'prediction']).show()

+-----+--------------------+--------------------+----------+
|label|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+----------+
|   16|[-7.4650506355219...|[3.28917268712459...|      43.0|
|    2|[-7.4655121275154...|[3.76707131453514...|      31.0|
|   16|[-7.4650506355219...|[3.28917268712459...|      43.0|
|    4|[-7.4650726649511...|[3.09555661679601...|      43.0|
|   14|[-7.4646128253141...|[3.53625473567110...|      43.0|
|    4|[-7.4650726649511...|[3.09555661679601...|      43.0|
|   11|[-7.4654049429803...|[3.73730792079566...|      29.0|
|    4|[-7.4650726649511...|[3.09555661679601...|      43.0|
|   11|[-7.4654049429803...|[3.73730792079566...|      29.0|
|   16|[-7.4650400569118...|[3.49055120567496...|      43.0|
|    2|[-7.4655227447676...|[3.78054958781725...|      31.0|
|    4|[-7.4650889276234...|[2.81868035613385...|       4.0|
|   14|[-7.4645965626417...|[3.47603574364158...|      14.0|
|   17|[-7.4650890332807

In [70]:
evaluator.evaluate(p)

1.0

### 多元分类

In [66]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator1 = MulticlassClassificationEvaluator()

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator2 = MulticlassClassificationEvaluator(metricName='accuracy')

In [67]:
cv2 = CrossValidator(estimator=pipeline, evaluator=evaluator1, estimatorParamMaps=paramGrid, numFolds=2)

cv3 = CrossValidator(estimator=pipeline, evaluator=evaluator2, estimatorParamMaps=paramGrid, numFolds=2)

In [68]:
start = time()
cvModel2 = cv2.fit(train_set)
p2 = cvModel2.transform(test_set)
p2.show()
print(time()-start)

+--------+-----+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|myapp_id|label|         typename|          myapp_word|      myapp_word_all|               words|            features|       rawPrediction|         probability|prediction|
+--------+-----+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
| 1376505|   16|    communication|sms, messag, free...|sms, messag, free...|[sms,, messag,, f...|(1000,[39,52,57,5...|[-7.4647759316632...|[1.65150546698479...|      16.0|
| 1376533|    2|           action|game, android, world|game, android, wo...|[game,, android,,...|(1000,[10,39,40,4...|[-7.4647958040277...|[1.07198843263364...|       2.0|
| 1376604|   16|    communication|app, phone, devic...|app, phone, devic...|[app,, phone,, de...|(1000,[39,52,57,5...|[-7.4647759316632...|[

In [69]:
start = time()
cvModel3 = cv3.fit(train_set)
p3 = cvModel3.transform(test_set)
p3.show()
print(time()-start)

+--------+-----+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|myapp_id|label|         typename|          myapp_word|      myapp_word_all|               words|            features|       rawPrediction|         probability|prediction|
+--------+-----+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
| 1376505|   16|    communication|sms, messag, free...|sms, messag, free...|[sms,, messag,, f...|(1000,[39,52,57,5...|[-7.4647759316632...|[1.65150546698479...|      16.0|
| 1376533|    2|           action|game, android, world|game, android, wo...|[game,, android,,...|(1000,[10,39,40,4...|[-7.4647958040277...|[1.07198843263364...|       2.0|
| 1376604|   16|    communication|app, phone, devic...|app, phone, devic...|[app,, phone,, de...|(1000,[39,52,57,5...|[-7.4647759316632...|[

## sparkmlhw04
采用ML Pipelines构建一个文档分类器，需要进行文档清洗、提取文档关键字、得到文档向量、提取分类主题的关键字，最后建立一个文档主题目分类器，进行模型训练、测试、分类指标：

数据： data_app

数据格式： doc_id1| doc_id2| soure|name|doc_name|country| typename|text

其中文档ID字段为：doc_id1   
其中文档名称字段为：doc_name   
其中文档类别字段为：typename   
其中文档内容为：text

采用ML Pipelines构建一个文档分类器，需要进行文档清洗、提取文档关键字、得到文档向量、提取分类主题的关键字，最后建立一个文档主题目分类器，进行模型训练、测试、分类指标：   
1） 文档清洗   
2） 文档关键字提取   
3） 主题的关键字提取   
4） 文档向量化，采用3种向量方法，进行测试   
5） 建立主题分类器   
6） 训练、测试、指标

In [71]:
!pwd

/Users/luoyonggui/PycharmProjects/mayiexamples/sparkml


In [73]:
df1 = spark.read.csv("file:///Users/luoyonggui/PycharmProjects/mayiexamples/sparkml/data/data_app", sep='|', header=False)

In [80]:
df1 = df1.withColumnRenamed('_c0','doc_id1')\
    .withColumnRenamed('_c1','doc_id2')\
    .withColumnRenamed('_c2','soure')\
    .withColumnRenamed('_c3','name')\
    .withColumnRenamed('_c4','doc_name')\
    .withColumnRenamed('_c5','country')\
    .withColumnRenamed('_c6','typename')\
    .withColumnRenamed('_c7','text')

In [81]:
df1.show(5)

+-------+-------+-----+--------------------+--------------------+-------+----------------+--------------------+
|doc_id1|doc_id2|soure|                name|            doc_name|country|        typename|                text|
+-------+-------+-----+--------------------+--------------------+-------+----------------+--------------------+
|1494604| 549745|    7|Curriculum Vitae-...|curriculum.vitae....|     en|        Business|My professional r...|
|1494602| 604435|    7|Five Questions (S...|com.kevinhecker.t...|     en|          Puzzle|Five Questions is...|
|1494599| 495855|    7|      RhymasaurusRex|com.yeti.rhymasau...|     en|   Entertainment|Are you a songwri...|
|1494596| 714480|    7|            helphelp|com.triplekingkon...|     en|       Lifestyle|guardian angel %2...|
|1494593| 568853|    7|Calorie Intake Ca...|com.piusvelte.cal...|     en|Health & Fitness|Description Estim...|
+-------+-------+-----+--------------------+--------------------+-------+----------------+--------------

In [95]:
df1.count()

335317

In [83]:
df1.select('typename').distinct().collect()

[Row(typename='en'),
 Row(typename='Music & Audio'),
 Row(typename='Education'),
 Row(typename='Trivia'),
 Row(typename='Entertainment'),
 Row(typename='Cards & Casino'),
 Row(typename='Adventure'),
 Row(typename='Arcade'),
 Row(typename='Sports'),
 Row(typename='Travel & Local'),
 Row(typename=None),
 Row(typename='Brain & Puzzle'),
 Row(typename='Sports Games'),
 Row(typename='Role Playing'),
 Row(typename='Media & Video'),
 Row(typename='Finance'),
 Row(typename='Personalization'),
 Row(typename='Arcade & Action'),
 Row(typename='Racing'),
 Row(typename='Tools'),
 Row(typename='Family'),
 Row(typename='Educational'),
 Row(typename='Comics'),
 Row(typename='Social'),
 Row(typename='Libraries & Demo'),
 Row(typename='Shopping'),
 Row(typename='Health & Fitness'),
 Row(typename='Productivity'),
 Row(typename='Card'),
 Row(typename='Casino'),
 Row(typename='Music'),
 Row(typename='Photography'),
 Row(typename='Brain &amp; Puzzle'),
 Row(typename='Lifestyle'),
 Row(typename='Business'),


### 分词tokenizer

In [116]:
from pyspark.ml.feature import RegexTokenizer
rtokenizer = RegexTokenizer(inputCol='text', outputCol='words')
rtokenizer.setPattern('[.,\s]+')
df3 = rtokenizer.transform(df1)
df3.show(5)

+-------+-------+-----+--------------------+--------------------+-------+----------------+--------------------+--------------------+
|doc_id1|doc_id2|soure|                name|            doc_name|country|        typename|                text|               words|
+-------+-------+-----+--------------------+--------------------+-------+----------------+--------------------+--------------------+
|1494604| 549745|    7|Curriculum Vitae-...|curriculum.vitae....|     en|        Business|My professional r...|[my, professional...|
|1494602| 604435|    7|Five Questions (S...|com.kevinhecker.t...|     en|          Puzzle|Five Questions is...|[five, questions,...|
|1494599| 495855|    7|      RhymasaurusRex|com.yeti.rhymasau...|     en|   Entertainment|Are you a songwri...|[are, you, a, son...|
|1494596| 714480|    7|            helphelp|com.triplekingkon...|     en|       Lifestyle|guardian angel %2...|[guardian, angel,...|
|1494593| 568853|    7|Calorie Intake Ca...|com.piusvelte.cal...|    

### 去除停用词

In [119]:
from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol="words", outputCol="rwords", stopWords=StopWordsRemover.loadDefaultStopWords('english'))
df4 = remover.transform(df3)
df4.show(2)

+-------+-------+-----+--------------------+--------------------+-------+--------+--------------------+--------------------+--------------------+
|doc_id1|doc_id2|soure|                name|            doc_name|country|typename|                text|               words|              rwords|
+-------+-------+-----+--------------------+--------------------+-------+--------+--------------------+--------------------+--------------------+
|1494604| 549745|    7|Curriculum Vitae-...|curriculum.vitae....|     en|Business|My professional r...|[my, professional...|[professional, re...|
|1494602| 604435|    7|Five Questions (S...|com.kevinhecker.t...|     en|  Puzzle|Five Questions is...|[five, questions,...|[five, questions,...|
+-------+-------+-----+--------------------+--------------------+-------+--------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [None]:
df5 = df4.join()

In [118]:
df4.select['tt']=df4.select('rwords').rdd.map(lambda l: [i.strip() for i in l['rwords'] if i.strip()])

TypeError: 'method' object does not support item assignment

### 文档特征化

In [90]:
from pyspark.ml.feature import HashingTF
hashingTF = HashingTF(numFeatures=1000, inputCol='rwords', outputCol='features')
df5 = hashingTF.transform(df4)
df5.select('features').show(1, False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                                                                                                |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(1000,[69,70,102,124,144,155,159,160,169,239,242,256,265,322,338,356,364,399,411,522,537,556,586,621,6

In [115]:
#pyspark.ml.feature.Word2Vec
from pyspark.ml.feature import Word2Vec

wv = Word2Vec(vectorSize=128, inputCol='rwords', outputCol='wvfeatures')
wvModel = wv.fit(df3)

Py4JJavaError: An error occurred while calling o14840.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 898.0 failed 1 times, most recent failure: Lost task 2.0 in stage 898.0 (TID 3850, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$2: (string) => array<string>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.mllib.feature.Word2Vec.learnVocab(Word2Vec.scala:196)
	at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:309)
	at org.apache.spark.ml.feature.Word2Vec.fit(Word2Vec.scala:186)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$2: (string) => array<string>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.NullPointerException
