In [None]:
from datetime import datetime
from pyspark.sql.functions import col, desc, asc, max

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Data Engineer Assignment") \
    .master("local[*]") \
    .getOrCreate()

today = datetime.today().strftime("%Y%m%d")
initials = "NHA"
output_folder = f"../data/processed/longest_runways_{today}_{initials}"

# when running from Databricks Azure Cloud instance using DBFS (Databricks file system):
# countries_df = spark.read.option("header", True).csv("/mnt/data/countries.csv")
# airports_df = spark.read.option("header", True).csv("/mnt/data/airports.csv")
# runways_df = spark.read.option("header", True).csv("/mnt/data/runways.csv")

countries_df = spark.read.option("header", True).csv("../data/raw/countries.csv")
airports_df = spark.read.option("header", True).csv("../data/raw/airports.csv")
runways_df = spark.read.option("header", True).csv("../data/raw/runways.csv")

# Top 3 / Bottom 10 by airport count
airport_counts = airports_df.groupBy("iso_country").count()
joined_with_country = airport_counts.join(countries_df, airport_counts.iso_country == countries_df.code, "right")

top_3 = joined_with_country.orderBy(desc("count")).limit(3)
bottom_10 = joined_with_country.orderBy(asc("count")).limit(10)


# question 
airport_counts.orderBy(asc("count")).show()
# top_3.select("name", "count").show()
# bottom_10.select("name", "count").show()

# Longest runway info per country
# Join runways with airports
joined_df = runways_df.join(
    airports_df,
    runways_df.airport_ref == airports_df.id,
    "inner"
).select(
    airports_df.iso_country,
    airports_df.name.alias("airport_name"),
    runways_df.length_ft.cast("int").alias("length_ft"),
    runways_df.width_ft.cast("int").alias("width_ft")
).na.drop(subset=["length_ft", "width_ft"])

# Get max runway length per country
max_length_df = joined_df.groupBy("iso_country").agg(
    max("length_ft").alias("max_length")
)

# Alias DataFrames
joined_aliased = joined_df.alias("j")
max_aliased = max_length_df.alias("m")

# Join to get longest runway info
longest_runways_all = joined_aliased.join(
    max_aliased,
    (col("j.iso_country") == col("m.iso_country")) &
    (col("j.length_ft") == col("m.max_length")),
    "inner"
).select(
    col("j.iso_country"),
    col("j.airport_name"),
    col("j.length_ft"),
    col("j.width_ft")
).distinct()


longest_runways_all.orderBy("iso_country").show(n=longest_runways_all.count(),truncate=False)


longest_runways_all.orderBy("iso_country") \
    .write.mode("overwrite") \
    .parquet(output_folder)

+-----------+-----+
|iso_country|count|
+-----------+-----+
|         IO|    1|
|         NR|    1|
|         GM|    1|
|         JE|    1|
|         YT|    1|
|         BL|    1|
|         SX|    1|
|         NU|    1|
|         GI|    1|
|         CW|    1|
|         VA|    1|
|         CX|    1|
|         CC|    1|
|         NF|    1|
|         RE|    2|
|         AI|    2|
|         PM|    2|
|         SM|    2|
|         LI|    2|
|         MS|    2|
+-----------+-----+
only showing top 20 rows

+-----------+-------------------------------------------------------------+---------+--------+
|iso_country|airport_name                                                 |length_ft|width_ft|
+-----------+-------------------------------------------------------------+---------+--------+
|AE         |Al Maktoum International Airport                             |14764    |197     |
|AF         |Bagram Airfield                                              |11820    |151     |
|AG         |V.C. B

25/06/16 11:59:32 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1326627 ms exceeds timeout 120000 ms
25/06/16 11:59:32 WARN SparkContext: Killing executors is not supported by current scheduler.
25/06/16 11:59:32 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$