In [1]:
from pyspark.sql import SparkSession

In [2]:
flight_perf_file = "file:///home/shilinlee/workspace/shilinlee/blog/spark_python/dataframe/departuredelays.csv"
airports_file = "file:///home/shilinlee/workspace/shilinlee/blog/spark_python/dataframe/airport-codes-na.txt"

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
# 获取机场数据集
airports = spark.read.csv(airports_file, header='true', inferSchema="true", sep="\t")
airports.createOrReplaceTempView("airports")

In [5]:
# 获取起飞延时数据集
flight_perf = spark.read.csv(flight_perf_file, header='true')
flight_perf.createOrReplaceTempView("FlightPerformance")
flight_perf.cache()

DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

In [6]:
# 通过城市和起飞代码查询华盛顿州的航班延误总数。这就要求将飞机性能数据和机场数据，通过国际航空运输协会（IATA）将代码关联起来。
spark.sql("""
select a.City, f.origin, sum(f.delay) as Delays 
from FlightPerformance as f
join airports as a on a.IATA = f.origin  
where a.State = 'WA'
group by a.City, f.origin
order by sum(f.delay) desc
""").show()

+-------+------+--------+
|   City|origin|  Delays|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+



In [7]:
# 查询US按照州分组的延误航班总数
spark.sql("""
select a.State, sum(f.delay) as Delays 
from FlightPerformance as f
join airports as a on a.IATA = f.origin  
where a.Country = 'USA'
group by a.State
""").show()

+-----+---------+
|State|   Delays|
+-----+---------+
|   SC|  80666.0|
|   AZ| 401793.0|
|   LA| 199136.0|
|   MN| 256811.0|
|   NJ| 452791.0|
|   OR| 109333.0|
|   VA|  98016.0|
| null| 397237.0|
|   RI|  30760.0|
|   WY|  15365.0|
|   KY|  61156.0|
|   NH|  20474.0|
|   MI| 366486.0|
|   NV| 474208.0|
|   WI| 152311.0|
|   ID|  22932.0|
|   CA|1891919.0|
|   CT|  54662.0|
|   NE|  59376.0|
|   MT|  19271.0|
+-----+---------+
only showing top 20 rows



**For more information, please refer to:**
- [Spark SQL, DataFrames and Datasets Guide](http://spark.apache.org/docs/latest/sql-programming-guide.html#sql)
- [PySpark SQL Module: DataFrame](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)
- [PySpark SQL Functions Module](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)

In [9]:
# Prints the (logical and physical) plans to the console for debugging purpose.

# Parameters
# extended – boolean, default False. If False, prints only the physical plan.
spark.sql("""
select a.State, sum(f.delay) as Delays 
from FlightPerformance as f
join airports as a on a.IATA = f.origin  
where a.Country = 'USA'
group by a.State
""").explain(True)

== Parsed Logical Plan ==
'Aggregate ['a.State], ['a.State, 'sum('f.delay) AS Delays#187]
+- 'Filter ('a.Country = USA)
   +- 'Join Inner, ('a.IATA = 'f.origin)
      :- 'SubqueryAlias `f`
      :  +- 'UnresolvedRelation `FlightPerformance`
      +- 'SubqueryAlias `a`
         +- 'UnresolvedRelation `airports`

== Analyzed Logical Plan ==
State: string, Delays: double
Aggregate [State#11], [State#11, sum(cast(delay#29 as double)) AS Delays#187]
+- Filter (Country#12 = USA)
   +- Join Inner, (IATA#13 = origin#31)
      :- SubqueryAlias `f`
      :  +- SubqueryAlias `flightperformance`
      :     +- Relation[date#28,delay#29,distance#30,origin#31,destination#32] csv
      +- SubqueryAlias `a`
         +- SubqueryAlias `airports`
            +- Relation[City#10,State#11,Country#12,IATA#13] csv

== Optimized Logical Plan ==
Aggregate [State#11], [State#11, sum(cast(delay#29 as double)) AS Delays#187]
+- Project [delay#29, State#11]
   +- Join Inner, (IATA#13 = origin#31)
      :- Project 