In [1]:
import findspark
findspark.init()

import pyspark

In [2]:
# Create a entry point to our spark cluster.
sc = pyspark.SparkContext(appName='flights')

In [3]:
# Create interface to our spark cluster.
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [4]:
import pandas as pd
import numpy as np

In [5]:
flights = spark.read.csv("flights_small.csv", inferSchema=True, header=True)
airports = spark.read.csv("airports.csv", inferSchema=True, header=True)
planes = spark.read.csv("planes.csv", inferSchema=True, header=True)

In [6]:
# Check data types for these guys.
print('####### flights #######')
print(flights.printSchema())
print('####### airports #######')
print(airports.printSchema())
print('####### planes #######')
print(planes.printSchema())

####### flights #######
root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)

None
####### airports #######
root
 |-- faa: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- tz: integer (nullable = true)
 |-- dst: string (nullable = true)

None
####### planes #######
root
 |-- tailnum: string (nullable = tr

#### Okay so we've got some integers imported correctely, which is good. inferSchema() method seems to work fine.

In [7]:
print('Oldest flight')
flights.select('year').groupby().max().show()

print('Most recent flight')
flights.select('year').groupby().min().show()

Oldest flight
+---------+
|max(year)|
+---------+
|     2014|
+---------+

Most recent flight
+---------+
|min(year)|
+---------+
|     2014|
+---------+



**Okay here we're doing some selections and filters to get familiar with the datasets.**

In [8]:
print('Average distance covered')
flights.select('distance').groupby().mean().show()

print('Maximum distance covered')
flights.select('distance').groupby().max().show()

print('Minimum distance covered')
flights.select('distance').groupby().min().show()

Average distance covered
+-------------+
|avg(distance)|
+-------------+
|    1208.1516|
+-------------+

Maximum distance covered
+-------------+
|max(distance)|
+-------------+
|         2724|
+-------------+

Minimum distance covered
+-------------+
|min(distance)|
+-------------+
|           93|
+-------------+



#### We might be then interested in capturing details of the longest flight. It's origin, it's destination, the plane that was involved in this flight etc... Typically this will be done by doing some joins on other datasets (planes and airports) 

In [9]:
temp_longest = flights.groupby('tailnum')
temp_longest
longest_fight_temp = temp_longest.max('distance')
longest_fight_temp.select('tailnum').first()

Row(tailnum='N442AS')

In [10]:
longest_distance_flight = longest_fight_temp.join(planes, on='tailnum', how='inner')

In [11]:
longest_distance_flight.show()

+-------+-------------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|max(distance)|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+-------------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|         2282|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N105UW|         2279|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N107US|         2279|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N108UW|         2279|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N109UW|         2282|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N110UW|         2378|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N111US|         2282|1999|Fixed wing multi .

In [None]:
# Distinct destinations.

distinct_origins = flights.select('origin').distinct()
distinct_origins.show()

# Distinct origins.
distinct_dest = flights.select('dest').distinct()
distinct_dest.show()
print('Number of unique destinations: ', distinct_dest.count())

In [183]:
flights.withColumn('arr_delay', flights.arr_delay.cast('double'))

DataFrame[year: int, month: int, day: int, dep_time: string, dep_delay: string, arr_time: string, arr_delay: double, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: string, distance: int, hour: string, minute: string]

In [182]:
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [184]:
late = flights.arr_delay > 0
late_flights = flights.groupby().max('arr_delay')

AnalysisException: '"arr_delay" is not a numeric column. Aggregation function can only be applied on a numeric column.;'