In [1]:
import pyspark
from pyspark.sql import SparkSession
#生成SparkSession实例
spark = SparkSession.builder \
     .master("local[*]") \
     .appName("test") \
     .config("spark.some.config.option", "some-value") \
     .getOrCreate()

## 1、df创建

### 1.1 读取文件

In [2]:
df1 = spark.read.csv("file:///home/ian/code/data/sparkml/doc_class.dat", sep='|', header=True)
df1.show(5)

+--------+----------+--------+--------------------+--------------------+
|myapp_id|typenameid|typename|          myapp_word|      myapp_word_all|
+--------+----------+--------+--------------------+--------------------+
| 1376533|         2|  action|game, android, world|game, android, wo...|
| 1376542|         2|  action|                game|game, app, enjoy,...|
| 1376603|         2|  action|run, tap, collect...|run, tap, collect...|
| 1376792|         2|  action|                 run|run, ath, game, m...|
| 1376941|         2|  action|fight, game, play...|fight, game, play...|
+--------+----------+--------+--------------------+--------------------+
only showing top 5 rows



### 1.2 list

In [4]:
df2 = spark.createDataFrame([[1, 2, 3],[4, 5, 6]], ["col1", "col2", "col0"])
df2.show()

+----+----+----+
|col1|col2|col0|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+



In [21]:
l = [('Alice', 18),('joey',27)]

In [22]:
spark.createDataFrame(l).collect()

[Row(_1='Alice', _2=18), Row(_1='joey', _2=27)]

In [23]:
spark.createDataFrame(l, ['name', 'age']).collect()

[Row(name='Alice', age=18), Row(name='joey', age=27)]

### 1.3 dict

In [24]:
d = [{'name': 'Alice', 'age': 18},{'name': 'Joey', 'age': 27}]
spark.createDataFrame(d).collect()



[Row(age=18, name='Alice'), Row(age=27, name='Joey')]

### 1.4 rdd
两种方式：  
* createDataFrame  
* rdd.toDF

In [26]:
rdd = spark.sparkContext.parallelize(l)
spark.createDataFrame(rdd).collect()

[Row(_1='Alice', _2=18), Row(_1='joey', _2=27)]

In [30]:
df = spark.createDataFrame(rdd, ['name', 'age'])
df.collect()

[Row(name='Alice', age=18), Row(name='joey', age=27)]

In [28]:
from pyspark.sql import Row
Person = Row('name', 'age')
person = rdd.map(lambda r: Person(*r))
df2 = spark.createDataFrame(person)
df2.collect()

[Row(name='Alice', age=18), Row(name='joey', age=27)]

In [44]:
person.toDF().show()

+-----+---+
| name|age|
+-----+---+
|Alice| 18|
| joey| 27|
+-----+---+



In [29]:
from pyspark.sql.types import *
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)])
df3 = spark.createDataFrame(rdd, schema)
df3.collect()

[Row(name='Alice', age=18), Row(name='joey', age=27)]

In [31]:
df.toPandas()

Unnamed: 0,name,age
0,Alice,18
1,joey,27


In [32]:
spark.createDataFrame(df.toPandas()).collect()

[Row(name='Alice', age=18), Row(name='joey', age=27)]

In [35]:
import pandas
pandas.DataFrame([[1, 2],[3,4]])

Unnamed: 0,0,1
0,1,2
1,3,4


In [36]:
spark.createDataFrame(pandas.DataFrame([[1, 2]])).collect()

[Row(0=1, 1=2)]

In [39]:
spark.createDataFrame(rdd, "a: string, b: int").collect()

[Row(a='Alice', b=18), Row(a='joey', b=27)]

In [40]:
rdd = rdd.map(lambda row: row[1])
spark.createDataFrame(rdd, "int").collect()

[Row(value=18), Row(value=27)]

In [47]:
spark.createDataFrame(rdd, "int").show()

+-----+
|value|
+-----+
|   18|
|   27|
+-----+



In [41]:
spark.createDataFrame(rdd, "boolean").collect()

Py4JJavaError: An error occurred while calling o666.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 71 in stage 67.0 failed 1 times, most recent failure: Lost task 71.0 in stage 67.0 (TID 2476, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ian/installed/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/home/ian/installed/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ian/installed/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ian/installed/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "/home/ian/installed/anaconda3/lib/python3.6/site-packages/pyspark-2.3.1-py3.6.egg/pyspark/sql/session.py", line 683, in prepare
    verify_func(obj)
  File "/home/ian/installed/anaconda3/lib/python3.6/site-packages/pyspark-2.3.1-py3.6.egg/pyspark/sql/types.py", line 1421, in verify
    verify_value(obj)
  File "/home/ian/installed/anaconda3/lib/python3.6/site-packages/pyspark-2.3.1-py3.6.egg/pyspark/sql/types.py", line 1415, in verify_default
    verify_acceptable_types(obj)
  File "/home/ian/installed/anaconda3/lib/python3.6/site-packages/pyspark-2.3.1-py3.6.egg/pyspark/sql/types.py", line 1310, in verify_acceptable_types
    % (dataType, obj, type(obj))))
TypeError: field value: BooleanType can not accept object 27 in type <class 'int'>

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3195)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3192)
	at sun.reflect.GeneratedMethodAccessor195.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ian/installed/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/home/ian/installed/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ian/installed/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ian/installed/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "/home/ian/installed/anaconda3/lib/python3.6/site-packages/pyspark-2.3.1-py3.6.egg/pyspark/sql/session.py", line 683, in prepare
    verify_func(obj)
  File "/home/ian/installed/anaconda3/lib/python3.6/site-packages/pyspark-2.3.1-py3.6.egg/pyspark/sql/types.py", line 1421, in verify
    verify_value(obj)
  File "/home/ian/installed/anaconda3/lib/python3.6/site-packages/pyspark-2.3.1-py3.6.egg/pyspark/sql/types.py", line 1415, in verify_default
    verify_acceptable_types(obj)
  File "/home/ian/installed/anaconda3/lib/python3.6/site-packages/pyspark-2.3.1-py3.6.egg/pyspark/sql/types.py", line 1310, in verify_acceptable_types
    % (dataType, obj, type(obj))))
TypeError: field value: BooleanType can not accept object 27 in type <class 'int'>

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [43]:
rdd1 = spark.sparkContext.parallelize([[1,2,3]])
df3 = spark.createDataFrame(rdd1)
df3.show()

+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  2|  3|
+---+---+---+



### 1.5 range

In [7]:
df = spark.range(5)
df

DataFrame[id: bigint]

In [8]:
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



## 2 df常用函数

### 2.0 查看
show,head,first,take  
df行数：df.count()  

### 2.1 排序
orderBy和sort好像是等同的

In [16]:
df.sort('id',ascending=False).show()

+---+
| id|
+---+
|  4|
|  3|
|  2|
|  1|
|  0|
+---+



In [11]:
df.orderBy('id',ascending=False).show()

+---+
| id|
+---+
|  4|
|  3|
|  2|
|  1|
|  0|
+---+



In [12]:
df.orderBy(desc('id')).show()

NameError: name 'desc' is not defined

In [13]:
from pyspark.sql.functions import desc
df.orderBy(desc('id')).show()

+---+
| id|
+---+
|  4|
|  3|
|  2|
|  1|
|  0|
+---+



In [14]:
df.orderBy(df['id'].desc()).show()

+---+
| id|
+---+
|  4|
|  3|
|  2|
|  1|
|  0|
+---+



In [15]:
df.orderBy(df.id.desc()).show()

+---+
| id|
+---+
|  4|
|  3|
|  2|
|  1|
|  0|
+---+



### 描述性统计

In [18]:
df.describe().show()

+-------+------------------+
|summary|                id|
+-------+------------------+
|  count|                 5|
|   mean|               2.0|
| stddev|1.5811388300841898|
|    min|                 0|
|    max|                 4|
+-------+------------------+



In [50]:
df1 = df.cache()
df1

DataFrame[name: string, age: bigint]

In [49]:
df.persist()

DataFrame[name: string, age: bigint]