In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('miss').getOrCreate()

In [2]:
df = spark.read.csv('ContainsNull.csv', header=True,
                   inferSchema=True)

In [3]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [4]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



#### 数据清除

根据某些规则清楚不完整的记录

In [6]:
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [7]:
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



每行记录至少有2列是完整的, 其他选项 `how='all'` 全部的列为null才会排除, `how='any'` 只要有null的列排除

In [8]:
df.na.drop(subset=['Name']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp4|Cindy|456.0|
+----+-----+-----+



`Name` 列不能有null值

对不完整数据做填充

In [9]:
df.na.fill('Unknow', subset='Name').show()

+----+------+-----+
|  Id|  Name|Sales|
+----+------+-----+
|emp1|  John| null|
|emp2|Unknow| null|
|emp3|Unknow|345.0|
|emp4| Cindy|456.0|
+----+------+-----+



## Date and Timestamps

In [10]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('dates').getOrCreate()

In [14]:
df = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)

In [15]:
df.show(1)

+-------------------+----------+----------+------------------+----------+---------+---------+
|               Date|      Open|      High|               Low|     Close|   Volume|Adj Close|
+-------------------+----------+----------+------------------+----------+---------+---------+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|214.009998|123432400|27.727039|
+-------------------+----------+----------+------------------+----------+---------+---------+
only showing top 1 row



In [16]:
df.count()

1762

In [17]:
df.head(1)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [18]:
from pyspark.sql.functions import (year, month, hour, 
                                  dayofmonth, dayofyear,
                                  weekofyear,
                                  format_number, date_format)

In [20]:
df.select(dayofyear(df['Date']), weekofyear(df['Date']), df['Date']).show(2)

+---------------+----------------+-------------------+
|dayofyear(Date)|weekofyear(Date)|               Date|
+---------------+----------------+-------------------+
|              4|               1|2010-01-04 00:00:00|
|              5|               1|2010-01-05 00:00:00|
+---------------+----------------+-------------------+
only showing top 2 rows



大概就是说这些时间函数可以得到一些时间信息的抽取

In [24]:
newdf = df.withColumn('Year', year(df['Date']))

In [29]:
res = newdf.groupBy('Year').mean().select(['Year', 'avg(Close)'])

In [30]:
res.select(['Year', format_number('avg(Close)', 2).alias('avg close')]).show()

+----+---------+
|Year|avg close|
+----+---------+
|2015|   120.04|
|2013|   472.63|
|2014|   295.40|
|2012|   576.05|
|2016|   104.60|
|2010|   259.84|
|2011|   364.00|
+----+---------+

