# PySpark Quickstart

[PySpark](http://spark.apache.org/docs/latest/api/python/index.html)是Spark的Python客户端模块，可以在Spark分布式环境中编写和执行Python脚本，使Python的数据分析能力扩展到大规模的服务器集群。这里可以快速体验PySpark的配置和查看参数，以及运行一个简单的任务，学习到分布式环境下的数据处理基本方法。

### Spark的Python客户端
由PySpark,SparkConf,SparkContext三个主要对象组成。pyspark支持本地、集群、混合三种模式运行。

In [1]:
import pyspark
from pyspark import SparkConf, SparkContext
sc = None

In [2]:
pyspark.status

<module 'pyspark.status' from '/root/anaconda3/envs/GISpark/lib/python3.5/site-packages/pyspark/status.py'>

In [3]:
#help(pyspark)

### 启动参数配置-SparkConf
与命令行启动的参数是一样的效果。可以参考：http://my.oschina.net/u/2306127/blog/639414

In [4]:
conf = (SparkConf()
         .setMaster("local")
         .setAppName("MyApp")
         .set("spark.executor.memory", "1g"))
print(conf)

<pyspark.conf.SparkConf object at 0x7f1789520550>


In [5]:
conf_kv = conf.getAll()
print(conf_kv)

[('spark.master', 'local'), ('spark.app.name', 'MyApp'), ('spark.executor.memory', '1g'), ('spark.submit.deployMode', 'client')]


In [6]:
#help(conf)

### 启动运行环境-SparkContext
SparkContext是所有运行时的容器。

In [7]:
if sc is None:
    sc = SparkContext(conf = conf)
    
print(type(sc))
print(sc)

<class 'pyspark.context.SparkContext'>
<pyspark.context.SparkContext object at 0x7f17895b5160>


In [8]:
#help(sc)

In [9]:
print(sc.applicationId)

local-1460953999586


### 创建数据序列

In [14]:
sc.range(5).collect()

[0, 1, 2, 3, 4]

In [11]:
sc.range(2, 4).collect()

[2, 3]

In [12]:
sc.range(1, 7, 2).collect()

[1, 3, 5]

### 切分数据任务
每一个节点拥有被切分数据的一个分区。

In [13]:
sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()

[[0], [2], [3], [4], [6]]

In [16]:
sc.parallelize(range(0, 6, 2), 5).glom().collect()

[[], [0], [], [2], [4]]

In [17]:
myRDD = sc.parallelize(range(6), 3)
sc.runJob(myRDD, lambda part: [x * x for x in part])

[0, 1, 4, 9, 16, 25]

In [18]:
myRDD = sc.parallelize(range(6), 3)
sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)

[0, 1, 16, 25]

### 创建广播变量
广播变量在所有节点上复制和同步。

In [19]:
b = sc.broadcast([1, 2, 3, 4, 5])
b.value
#[1, 2, 3, 4, 5]

[1, 2, 3, 4, 5]

In [20]:
sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
#[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
#b.unpersist()

[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]

### 停止Spark运行环境

In [21]:
sc.stop()