In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
sc = spark.sparkContext

In [4]:
df = spark.read.csv("测试数据/stu_score.txt", sep=',', header=False)
df

DataFrame[_c0: string, _c1: string, _c2: string]

In [4]:
df2 = df.toDF('id', 'name', 'score')
df2

DataFrame[id: string, name: string, score: string]

In [5]:
df2.printSchema(), df2.show()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- score: string (nullable = true)

+---+----+-----+
| id|name|score|
+---+----+-----+
|  1|语文|   99|
|  2|语文|   99|
|  3|语文|   99|
|  4|语文|   99|
|  5|语文|   99|
|  6|语文|   99|
|  7|语文|   99|
|  8|语文|   99|
|  9|语文|   99|
| 10|语文|   99|
| 11|语文|   99|
| 12|语文|   99|
| 13|语文|   99|
| 14|语文|   99|
| 15|语文|   99|
| 16|语文|   99|
| 17|语文|   99|
| 18|语文|   99|
| 19|语文|   99|
| 20|语文|   99|
+---+----+-----+
only showing top 20 rows


(None, None)

In [6]:
df2.createTempView(name='score') # 创建表 以SQL风格进行查询
spark.sql("""
    select * from score where name='语文' limit 5
""").show() # SQL风格

+---+----+-----+
| id|name|score|
+---+----+-----+
|  1|语文|   99|
|  2|语文|   99|
|  3|语文|   99|
|  4|语文|   99|
|  5|语文|   99|
+---+----+-----+


In [16]:
df2.where("name='语文'").limit(5).show() # 直接使用sparkAPI查询（DSL风格）

+---+----+-----+
| id|name|score|
+---+----+-----+
|  1|语文|   99|
|  2|语文|   99|
|  3|语文|   99|
|  4|语文|   99|
|  5|语文|   99|
+---+----+-----+


创建DataFrame

In [5]:
rdd = sc.textFile('测试数据/sql/people.txt').map(lambda x: x.split(',')).map(lambda x: (x[0], int(x[1])))

In [3]:
rdd.collect()

                                                                                

[('Michael', 29), ('Andy', 30), ('Justin', 19)]

In [4]:
# 指定要转换为DF的RDD； 指定列名，list形式给出
df = spark.createDataFrame(rdd, schema=['name', 'age'])

In [5]:
# 有RDD的数据类型推断出列类型 + 给定的列名 + 默认不为非空 得到schema信息
df.printSchema() 

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)


In [6]:
# 返回记录数，默认20； 是否对列的字符串长度超过20的截断 默认是True
df.show(5, False) 

+-------+---+
|name   |age|
+-------+---+
|Michael|29 |
|Andy   |30 |
|Justin |19 |
+-------+---+


In [7]:
# 对DF对象创建临时视图表 供SQL查询
df.createOrReplaceTempView("People")

In [8]:
spark.sql("select * from People where age<30").show()

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
| Justin| 19|
+-------+---+


In [6]:
# 定义StructType对象设置更加完整的schema
from pyspark.sql.types import StructType, StringType, IntegerType
schema = StructType().add("name", StringType(), nullable=False).add("age", IntegerType(), nullable=False)
df = spark.createDataFrame(rdd, schema=schema)

In [7]:
df.printSchema()

root
 |-- name: string (nullable = false)
 |-- age: integer (nullable = false)


In [8]:
df.show()

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+


                                                                                

直接使用rdd.toDF创建

In [10]:
df = rdd.toDF(['name', 'age'])
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+


In [12]:
df2 = rdd.toDF(schema=schema)
df2.printSchema()
df2.show()

root
 |-- name: string (nullable = false)
 |-- age: integer (nullable = false)

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+


使用Pandas的DF创建SparkSQL的DF

In [14]:
import pandas as pd
df_pandas = pd.DataFrame({
    "id": [1, 2, 3],
    "name": ['zzj', 'mmy', 'ymm'],
    'age': [24, 24, 24]
})
df = spark.createDataFrame(df_pandas)
df.printSchema()
df.show()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+---+----+---+
| id|name|age|
+---+----+---+
|  1| zzj| 24|
|  2| mmy| 24|
|  3| ymm| 24|
+---+----+---+


不同数据源统一读取为DF的方法

In [5]:
# text数据源，只管把一整行作为字符串 列明默认为Value 设置了.option("sep", ', ')也没用
schema = StructType().add("data", StringType(), nullable=False)
df = spark.read.format("text").schema(schema=schema).load("测试数据/sql/people.txt")
df.printSchema()
df.show()

root
 |-- data: string (nullable = true)

+-----------+
|       data|
+-----------+
|Michael, 29|
|   Andy, 30|
| Justin, 19|
+-----------+


In [11]:
# 读取json数据源  自带schema 可以识别
df = spark.read.format('json').load('测试数据/sql/people.json')
df.printSchema()
df.show()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


In [16]:
# 读取csv
df = spark.read.format("csv").option("sep",',').option('header', False).option('encoding', 'utf-8')\
                             .schema("name STRING, age INT, job STRING").load('测试数据/sql/people.csv')
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)

+------------------+----+----+
|              name| age| job|
+------------------+----+----+
|      name;age;job|NULL|NULL|
|Jorge;30;Developer|NULL|NULL|
|  Bob;32;Developer|NULL|NULL|
|  Ani;11;Developer|NULL|NULL|
|   Lily;11;Manager|NULL|NULL|
|  Put;11;Developer|NULL|NULL|
|   Alice;9;Manager|NULL|NULL|
|   Alice;9;Manager|NULL|NULL|
|   Alice;9;Manager|NULL|NULL|
|   Alice;9;Manager|NULL|NULL|
|    Alice;;Manager|NULL|NULL|
|          Alice;9;|NULL|NULL|
+------------------+----+----+


In [17]:
# 读取parquet数据源 （Spark常用的列式存储文件格式 类似Hive的ORC） 
# 对比text 1.内置schema 2. 列式存储 3. 序列化存储（体积小） 直接load()读取为DF
df = spark.read.format('parquet').load('测试数据/sql/users.parquet')
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- favorite_color: string (nullable = true)
 |-- favorite_numbers: array (nullable = true)
 |    |-- element: integer (containsNull = true)

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          NULL|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+


DSL风格的增删改查

In [2]:
df = spark.read.format("csv").schema("id INT, subject STRING, score INT").load('测试数据/stu_score.txt')

In [3]:
df.show(5)

                                                                                

+---+-------+-----+
| id|subject|score|
+---+-------+-----+
|  1|   语文|   99|
|  2|   语文|   99|
|  3|   语文|   99|
|  4|   语文|   99|
|  5|   语文|   99|
+---+-------+-----+


In [5]:
df.select(['id', 'subject']).show(5)

+---+-------+
| id|subject|
+---+-------+
|  1|   语文|
|  2|   语文|
|  3|   语文|
|  4|   语文|
|  5|   语文|
+---+-------+


In [7]:
df.select("id", "subject").show(5)

+---+-------+
| id|subject|
+---+-------+
|  1|   语文|
|  2|   语文|
|  3|   语文|
|  4|   语文|
|  5|   语文|
+---+-------+


In [8]:
# 获取Column对象 
id_col = df['id']
subject_col = df['subject']
id_col, subject_col

(Column<'id'>, Column<'subject'>)

In [11]:
df.select(id_col, subject_col).show(5)

+---+-------+
| id|subject|
+---+-------+
|  1|   语文|
|  2|   语文|
|  3|   语文|
|  4|   语文|
|  5|   语文|
+---+-------+


In [12]:
df.filter("score < 99").show(5)

+---+-------+-----+
| id|subject|score|
+---+-------+-----+
|  1|   数学|   96|
|  2|   数学|   96|
|  3|   数学|   96|
|  4|   数学|   96|
|  5|   数学|   96|
+---+-------+-----+


In [14]:
df.where('score < 99').show(5)

+---+-------+-----+
| id|subject|score|
+---+-------+-----+
|  1|   数学|   96|
|  2|   数学|   96|
|  3|   数学|   96|
|  4|   数学|   96|
|  5|   数学|   96|
+---+-------+-----+


In [15]:
df.filter(df['score'] < 99).show(5)

+---+-------+-----+
| id|subject|score|
+---+-------+-----+
|  1|   数学|   96|
|  2|   数学|   96|
|  3|   数学|   96|
|  4|   数学|   96|
|  5|   数学|   96|
+---+-------+-----+


In [16]:
df.groupBy("subject").count().show(5)

+-------+-----+
|subject|count|
+-------+-----+
|   英语|   30|
|   语文|   30|
|   数学|   30|
+-------+-----+


In [17]:
df.groupBy(df['subject']).count().show(5)

+-------+-----+
|subject|count|
+-------+-----+
|   英语|   30|
|   语文|   30|
|   数学|   30|
+-------+-----+


In [19]:
# 不是DF对象，用来进一步聚合操作 聚合后才是DF对象可以show
r = df.groupBy("subject")
print(type(r))

<class 'pyspark.sql.group.GroupedData'>


In [21]:
r.count().show()

+-------+-----+
|subject|count|
+-------+-----+
|   英语|   30|
|   语文|   30|
|   数学|   30|
+-------+-----+


In [22]:
# SQL风格
df.createTempView('score') # 注册临时视图
df.createOrReplaceTempView('score2') # 
df.createGlobalTempView('score3') # 全局表查询时需要前缀global_temp.

In [24]:
spark.sql("select subject, count(*) from score group by subject").show()
spark.sql("select subject, count(*) from score2 group by subject").show()
spark.sql("select subject, count(*) from global_temp.score3 group by subject").show()

+-------+--------+
|subject|count(1)|
+-------+--------+
|   英语|      30|
|   语文|      30|
|   数学|      30|
+-------+--------+

+-------+--------+
|subject|count(1)|
+-------+--------+
|   英语|      30|
|   语文|      30|
|   数学|      30|
+-------+--------+

+-------+--------+
|subject|count(1)|
+-------+--------+
|   英语|      30|
|   语文|      30|
|   数学|      30|
+-------+--------+


数据处理API

In [3]:
df = spark.read.format('csv').option('sep', ';').option('header', True).load('测试数据/sql/people.csv')
df.show()

+-----+----+---------+
| name| age|      job|
+-----+----+---------+
|Jorge|  30|Developer|
|  Bob|  32|Developer|
|  Ani|  11|Developer|
| Lily|  11|  Manager|
|  Put|  11|Developer|
|Alice|   9|  Manager|
|Alice|   9|  Manager|
|Alice|   9|  Manager|
|Alice|   9|  Manager|
|Alice|NULL|  Manager|
|Alice|   9|     NULL|
+-----+----+---------+


In [4]:
df.dropDuplicates().show() # 删除整行重复的（每个列都要相同）
df.dropDuplicates(['age', 'job']).show() # 指定不能重复的列

+-----+----+---------+
| name| age|      job|
+-----+----+---------+
|  Ani|  11|Developer|
|Alice|   9|  Manager|
| Lily|  11|  Manager|
|  Put|  11|Developer|
|Jorge|  30|Developer|
|  Bob|  32|Developer|
|Alice|   9|     NULL|
|Alice|NULL|  Manager|
+-----+----+---------+

+-----+----+---------+
| name| age|      job|
+-----+----+---------+
|Alice|NULL|  Manager|
|  Ani|  11|Developer|
| Lily|  11|  Manager|
|Jorge|  30|Developer|
|  Bob|  32|Developer|
|Alice|   9|     NULL|
|Alice|   9|  Manager|
+-----+----+---------+


In [9]:
# 缺失值处理
df.dropna().show() # 有空值就删除该行
df.dropna(thresh=1, subset=['age', 'job']).show() # subset中的有效列，必须有thresh个不为空，否则删除该行

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
|  Ani| 11|Developer|
| Lily| 11|  Manager|
|  Put| 11|Developer|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
+-----+---+---------+

+-----+----+---------+
| name| age|      job|
+-----+----+---------+
|Jorge|  30|Developer|
|  Bob|  32|Developer|
|  Ani|  11|Developer|
| Lily|  11|  Manager|
|  Put|  11|Developer|
|Alice|   9|  Manager|
|Alice|   9|  Manager|
|Alice|   9|  Manager|
|Alice|   9|  Manager|
|Alice|NULL|  Manager|
|Alice|   9|     NULL|
+-----+----+---------+


In [11]:
# 填充缺失值
df.fillna(value='N/A').show() # 全部缺失值都填为value
df.fillna(value='N/A', subset=['job']).show() # 指定subset列补充缺值为value
df.fillna({"name": '未知姓名', "age": 1, "job": "worker"}).show() # 都不同列 填指定的值

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
|  Ani| 11|Developer|
| Lily| 11|  Manager|
|  Put| 11|Developer|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|N/A|  Manager|
|Alice|  9|      N/A|
+-----+---+---------+

+-----+----+---------+
| name| age|      job|
+-----+----+---------+
|Jorge|  30|Developer|
|  Bob|  32|Developer|
|  Ani|  11|Developer|
| Lily|  11|  Manager|
|  Put|  11|Developer|
|Alice|   9|  Manager|
|Alice|   9|  Manager|
|Alice|   9|  Manager|
|Alice|   9|  Manager|
|Alice|NULL|  Manager|
|Alice|   9|      N/A|
+-----+----+---------+

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
|  Ani| 11|Developer|
| Lily| 11|  Manager|
|  Put| 11|Developer|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  1|  Manager|
|Alice|  9|   worker|
+-----+---+----

写出API

In [None]:
import pyspark.sql.functions as F

schema = StructType().add("user_id", StringType(), nullable=True)\
                     .add("movie_id", StringType(), nullable=True)\
                     .add("rank", IntegerType(), nullable=True)\
                     .add("ts", StringType(), nullable=True)
df = spark.read.format("csv").option("sep", "\t").schema(schema=schema).load('测试数据/sql/u.data')

## 保存为text数据源，df只能是一列的 （读入也是df为一列）
df.select(F.concat_ws('-', 'user_id', 'movie_id', 'rank', "ts")).write\
                            .mode("overwrite").format('text')\
                            .option('sep', '-').option('header', True)\
                            .save('测试数据/save_df/text')

## csv默认是根据','作为sep的
df.write.format("csv").mode("overwrite").option("sep", ',').option('header', True).save("测试数据/save_df/csv")

## json不需要指定什么option
df.write.format('json').mode('overwrite').save("测试数据/save_df/json")

## 默认是'parquent'格式数据源保存
df.write.mode("overwrite").save('测试数据/save_df/parquent')