# DataFrame基本操作

```{note}
本节介绍DataFrame的基本操作。<br/>
包括读写、projection&filter、列操作、聚合等。
```

## 读写

Reading and writing are simple in Spark because of high-level abstractions and contributions from the community to connect to a wide variety of data sources.

In [1]:
import sys

from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName("BasicOperation")
         .getOrCreate())

In [2]:
from pyspark.sql.types import *

# 最好先定义好schema
# Programmatic way to define a schema
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                          StructField('UnitID', StringType(), True),
                          StructField('IncidentNumber', IntegerType(), True),
                          StructField('CallType', StringType(), True),
                          StructField('CallDate', StringType(), True),
                          StructField('WatchDate', StringType(), True),
                          StructField('CallFinalDisposition', StringType(), True),
                          StructField('AvailableDtTm', StringType(), True),
                          StructField('Address', StringType(), True),
                          StructField('City', StringType(), True),
                          StructField('Zipcode', IntegerType(), True),
                          StructField('Battalion', StringType(), True),
                          StructField('StationArea', StringType(), True),
                          StructField('Box', StringType(), True),
                          StructField('OriginalPriority', StringType(), True),
                          StructField('Priority', StringType(), True),
                          StructField('FinalPriority', IntegerType(), True),
                          StructField('ALSUnit', BooleanType(), True),
                          StructField('CallTypeGroup', StringType(), True),
                          StructField('NumAlarms', IntegerType(), True),
                          StructField('UnitType', StringType(), True),
                          StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                          StructField('FirePreventionDistrict', StringType(), True),
                          StructField('SupervisorDistrict', StringType(), True),
                          StructField('Neighborhood', StringType(), True),
                          StructField('Location', StringType(), True),
                          StructField('RowID', StringType(), True),
                          StructField('Delay', FloatType(), True)])

# Use the DataFrameReader interface to read a CSV file
sf_fire_file = "../data/sf-fire-calls.csv"
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

In [3]:
# In Python to save as a Parquet file
parquet_path = "../data/sf_parquet"
fire_df.write.format("parquet").save(parquet_path)

## projections and filters

In [13]:
from pyspark.sql.functions import *

# 获取CallType不是Medical Incident的记录
few_fire_df = (fire_df
               # select是projection
               .select("IncidentNumber", "AvailableDtTm", "CallType")
               # where、filter是filter
               .where(col("CallType") != "Medical Incident"))
few_fire_df.show(5, truncate=False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [5]:
# 使用countDistinct()计算一共有多少种不同的CallType
(fire_df
 .select("CallType")
 .where(col("CallType").isNotNull())
 # agg实际上是聚合操作
 .agg(countDistinct("CallType").alias("DistinctCallTypes"))
 .show())

+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+



In [6]:
# 列出不同的CallType
(fire_df
 .select("CallType")
 .where(col("CallType").isNotNull())
 # distinct去重
 .distinct()
 .show(10, False))

+-----------------------------------+
|CallType                           |
+-----------------------------------+
|Elevator / Escalator Rescue        |
|Marine Fire                        |
|Aircraft Emergency                 |
|Confined Space / Structure Collapse|
|Administrative                     |
|Alarms                             |
|Odor (Strange / Unknown)           |
|Citizen Assist / Service Call      |
|HazMat                             |
|Watercraft in Distress             |
+-----------------------------------+
only showing top 10 rows



## 重命名、添加和删除列

In [7]:
# 重命名数据集
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")

In [8]:
# 转化时间
fire_ts_df = (new_fire_df
              # `CallDate`列是`MM/dd/yyyy`格式
              # 添加一个新列`IncidentDate`，它是timestamp格式
              .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
              # 删除`CallDate`列
              .drop("CallDate")
              .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
              .drop("WatchDate")
              .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a"))
              .drop("AvailableDtTm"))

In [9]:
# Select the converted columns
(fire_ts_df
 .select("IncidentDate", "OnWatchDate", "AvailableDtTS")
 .show(5, False))

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [10]:
# 获取所有年份
(fire_ts_df
 .select(year('IncidentDate'))
 .distinct()
 .orderBy(year('IncidentDate'))
 .show())

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



## 聚合

A handful of transformations and actions on DataFrames, such as groupBy(), orderBy(), and count().

Offer the ability to aggregate by column names and then aggregate counts across them.

In [11]:
# 统计数量最多的10种CallType
(fire_ts_df
 .select("CallType")
 .where(col("CallType").isNotNull())
 .groupBy("CallType")
 # 这里进行了聚合
 .count()
 .orderBy("count", ascending=False)
 .show(n=10, truncate=False))

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



## 其他DataFrame操作

The DataFrame API provides descriptive statistical methods like min(), max(), sum(), and avg().

In [12]:
import pyspark.sql.functions as F

# 计算NumAlarms的总量，ResponseDelayedinMins的平均值
(fire_ts_df
 .select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"))
 .show())

+--------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|
+--------------+--------------------------+
|        176170|         3.892364154521585|
+--------------+--------------------------+

