In [3]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

spark_session = SparkSession.builder.appName("demo").config(conf=SparkConf()).getOrCreate()

In [4]:
def create_df_from_json_file(spark):
    df = spark.read.json("./resources/people.json")
    #显示表内容
    df.show()
    #显示表信息
    df.printSchema()
    #显示某列的信息
    df.select("name").show()
    #显示名字和年龄，年龄加1
    df.select(df['name'], df['age'] + 1).show()
    #只显示年龄大于17的人
    df.filter(df['age'] > 17).show()
    #按年龄显示人数
    df.groupBy(df['age']).count().show()
    #使用sql查询语句
    df.createOrReplaceTempView("people")
    
    sqlDF = spark.sql("select * from people where age > 17")
    #显示结果
    sqlDF.show()

In [5]:
create_df_from_json_file(spark_session)

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+



## 从上面结果看到，Michael的年龄值为null,SparkSQL里的DataFrame也有fillna()的方法。我们可以调用df.fillna(13),或者df.na.fill(13)可以将null值填充为13, 如果一个DataFrame中有多个列的值为null, 也可用df.fill({'列名1':值1， '列名2':值2})来一起填充。

In [8]:
#fill na:
df_test = spark_session.read.json("./resources/people.json")
df_test = df_test.na.fill(13)
df_test.show()

+---+-------+
|age|   name|
+---+-------+
| 13|Michael|
| 30|   Andy|
| 19| Justin|
+---+-------+



In [11]:
from pyspark.sql import Row

def rdd_to_df_way_one(spark):
    sc = spark.sparkContext
    line_rdd = sc.textFile("./resources/people.txt")
    peoples = line_rdd.map(lambda x: x.split(",")).map(lambda x: Row(name=x[0], age=int(x[1])))
    
    people_df = spark.createDataFrame(peoples)
    
    people_df.createOrReplaceTempView("people_df")
    
    data = spark.sql("select * from people_df")
    data.show()
    #data是DataFrame，将其转成rdd
    data_name = data.rdd.map(lambda x:x.name).collect()
    for name in data_name:
        print("name :", name)

In [12]:
rdd_to_df_way_one(spark_session)

+---+-------+
|age|   name|
+---+-------+
| 29|Michael|
| 30|   Andy|
| 19| Justin|
+---+-------+

name : Michael
name : Andy
name : Justin


In [15]:
from pyspark.sql.types import *
def rdd_to_df_way_second(spark):
    sc = spark.sparkContext
    #读文件，生成rdd
    line = sc.textFile("./resources/people.txt")
    parts = line.map(lambda x : x.split(",")).map(lambda x : (x[0], x[1].strip()))
    
    schema_str = "name age"
    fields = [StructField(field_name, StringType(), True) for field_name in schema_str.split()]
    schema = StructType(fields)
    
    people_df = spark.createDataFrame(parts, schema)
    
    people_df.createOrReplaceTempView("people_1")
    
    sql_result = spark.sql("select * from people_1 where age = 30")
    sql_result.show()

In [16]:
rdd_to_df_way_second(spark_session)

+----+---+
|name|age|
+----+---+
|Andy| 30|
+----+---+



In [17]:
spark_session.stop()