# NYC Flights data 2013 with Weather data

Reference : https://github.com/rich-iannone/so-many-pyspark-examples/blob/main/spark-dataframes.ipynb



In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .config("spark.executor.memory", "8g") \
        .appName("Convert CSV to parquet") \
        .master("spark://b2-120-gra11:7077") \
        .getOrCreate()


22/12/05 17:02:44 WARN Utils: Your hostname, b2-120-gra11 resolves to a loopback address: 127.0.1.1; using 141.94.168.194 instead (on interface ens3)
22/12/05 17:02:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/05 17:02:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Joins

Joins are easily performed with Spark DataFrames. The expression is:

`join(other, on = None, how = None)`

where:
- other: a DataFrame that serves as the right side of the join
- on: typically a join expression
- how: the default is `inner` but there are also `inner`, `outer`, `left_outer`, `right_outer`, and `leftsemi` joins available

Let's load in some more data so that we can have two DataFrames to join. The **CSV** file `weather.csv` contains hourly meteorological data from EWR during 2013. `nycflights2013.csv` contains flights data duringthe same period.
Lets create nycflights2013 using a schema object made with `pyspark.sql.type``

In [2]:
from pyspark.sql.types import *  # Necessary for creating schemas
from pyspark.sql.functions import * # Importing PySpark functions

nycflights_schema = StructType([
  StructField('year', IntegerType(), True),
  StructField('month', IntegerType(), True),
  StructField('day', IntegerType(), True),
  StructField('dep_time', StringType(), True),
  StructField('dep_delay', IntegerType(), True),
  StructField('arr_time', StringType(), True),
  StructField('arr_delay', IntegerType(), True),
  StructField('carrier', StringType(), True),
  StructField('tailnum', StringType(), True),
  StructField('flight', StringType(), True),  
  StructField('origin', StringType(), True),
  StructField('dest', StringType(), True),
  StructField('air_time', IntegerType(), True),
  StructField('distance', IntegerType(), True),
  StructField('hour', IntegerType(), True),
  StructField('minute', IntegerType(), True)
  ])

# ...and then read the CSV with the schema
nycflights13_csv = spark.read.csv("file:///srv/data/nycflights/nycflights13.csv", schema = nycflights_schema )
nycflights13_csv.show()

                                                                                

+----+-----+----+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month| day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+----+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|null| null|null|dep_time|     null|arr_time|     null|carrier|tailnum|flight|origin|dest|    null|    null|null|  null|
|2013|    1|   1|     517|        2|     830|       11|     UA| N14228|  1545|   EWR| IAH|     227|    1400|   5|    17|
|2013|    1|   1|     533|        4|     850|       20|     UA| N24211|  1714|   LGA| IAH|     227|    1416|   5|    33|
|2013|    1|   1|     542|        2|     923|       33|     AA| N619AA|  1141|   JFK| MIA|     160|    1089|   5|    42|
|2013|    1|   1|     544|       -1|    1004|      -18|     B6| N804JB|   725|   JFK| BQN|     183|    1576|   5|    44|
|2013|    1|   1|     554|      

### Create a proper timestamp.

We have all the components: `year`, `month`, `day`, `hour`, and `minute`.

Use `concat_ws()` (concatentate with separator) to combine column data into StringType columns such that dates (`-` separator, YYYY-MM-DD) and times (`:` separator, 24-hour time) are formed

In [3]:
nycflights13 = \
(nycflights13_csv
 .withColumn('date',
             concat_ws('-',
                       nycflights13_csv.year,
                       nycflights13_csv.month,
                       nycflights13_csv.day))
 .withColumn('time',
             concat_ws(':',
                       nycflights13_csv.hour,
                       nycflights13_csv.minute)))

In a second step, concatenate with `concat_ws()` the `date` and `time` strings (separator is a space); then drop several columns

In [4]:
nycflights13 = \
(nycflights13
 .withColumn('timestamp',
             concat_ws(' ',
                       nycflights13.date,
                       nycflights13.time))
 .drop('date')     # `drop()` doesn't accept a list of column names, therefore, for every column, 
 .drop('minute')   # we would like to remove from the DataFrame, we must create a new `drop()`
 .drop('time'))    # statement

# In the final step, convert the `timestamp` from
# a StringType into a TimestampType
nycflights13 = \
(nycflights13
 .withColumn('timestamp',
             to_utc_timestamp(nycflights13.timestamp, 'GMT')))

Create a schema object and then read the CSV with the schema

In [5]:
weather_schema = StructType([  
  StructField('year', IntegerType(), True),
  StructField('month', IntegerType(), True),
  StructField('day', IntegerType(), True),
  StructField('hour', IntegerType(), True),
  StructField('temp', FloatType(), True),
  StructField('dewp', FloatType(), True),
  StructField('humid', FloatType(), True),
  StructField('wind_dir', IntegerType(), True),
  StructField('wind_speed', FloatType(), True),
  StructField('wind_gust', FloatType(), True),
  StructField('precip', FloatType(), True),
  StructField('pressure', FloatType(), True),
  StructField('visib', FloatType(), True)
  ])


weather = spark.read.csv("file:///srv/data/nycflights/weather.csv", schema = weather_schema)

In [6]:
weather.show()

+----+-----+----+----+-----+-----+-----+--------+----------+---------+------+--------+-----+
|year|month| day|hour| temp| dewp|humid|wind_dir|wind_speed|wind_gust|precip|pressure|visib|
+----+-----+----+----+-----+-----+-----+--------+----------+---------+------+--------+-----+
|null| null|null|null| null| null| null|    null|      null|     null|  null|    null| null|
|2013|    1|   1|   0|37.04|21.92|53.97|     230|  10.35702|11.918652|   0.0|  1013.9| 10.0|
|2013|    1|   1|   1|37.04|21.92|53.97|     230|  13.80936|15.891536|   0.0|  1013.0| 10.0|
|2013|    1|   1|   2|37.94|21.92|52.09|     230|  12.65858|14.567241|   0.0|  1012.6| 10.0|
|2013|    1|   1|   3|37.94| 23.0|54.51|     230|  13.80936|15.891536|   0.0|  1012.7| 10.0|
|2013|    1|   1|   4|37.94|24.08|57.04|     240|  14.96014| 17.21583|   0.0|  1012.8| 10.0|
|2013|    1|   1|   6|39.02|26.06|59.37|     270|  10.35702|11.918652|   0.0|  1012.0| 10.0|
|2013|    1|   1|   7|39.02|26.96|61.63|     250|   8.05546| 9.270062|

                                                                                

Join the `nycflights` DF with the `weather` DF 

In [7]:
nycflights_all_columns = \
(nycflights13
 .join(weather,
       [nycflights13.month == weather.month, # three join conditions: month,
        nycflights13.day == weather.day,     #                        day,
        nycflights13.hour == weather.hour],  #                        hour
       'left_outer')) # left outer join: keep all rows from the left DF (flights), with the matching rows in the right DF (weather)
                      # NULLs created if there is no match to the right DF

In [8]:
nycflights_all_columns.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- temp: float (nullable = true)
 |-- dewp: float (nullable = true)
 |-- humid: float (nullable = true)
 |-- wind_dir: integer (nullable = true)
 |-- wind_speed: float (nullable = true)
 |-- wind_gust: float (nullable

One way to reduce the number of extraneous columns is to use a `select()` statement

In [9]:
nycflights_wind_visib = \
(nycflights_all_columns
 .select(['timestamp', 'carrier', 'flight',
          'origin', 'dest', 'wind_dir',
          'wind_speed', 'wind_gust', 'visib']))

In [10]:
nycflights_wind_visib.schema.fields

[StructField('timestamp', TimestampType(), True),
 StructField('carrier', StringType(), True),
 StructField('flight', StringType(), True),
 StructField('origin', StringType(), True),
 StructField('dest', StringType(), True),
 StructField('wind_dir', IntegerType(), True),
 StructField('wind_speed', FloatType(), True),
 StructField('wind_gust', FloatType(), True),
 StructField('visib', FloatType(), True)]

Let's load in even more data so we can determine if any takeoffs occurred in very windy weather.

The **CSV** `beaufort_land.csv` contains Beaufort scale values (the `force` column), wind speed ranges in *mph*, and the name for each wind force.

In [11]:
# Create a schema object... 
beaufort_land_schema = StructType([  
  StructField('force', IntegerType(), True),
  StructField('speed_mi_h_lb', IntegerType(), True),
  StructField('speed_mi_h_ub', IntegerType(), True),
  StructField('name', StringType(), True)
  ])

# ...and then read the CSV with the schema
beaufort_land = spark.read.csv('/srv/data/nycflights/beaufort_land.csv', 
                               header = True, schema = beaufort_land_schema)

In [12]:
beaufort_land.show()

+-----+-------------+-------------+---------------+
|force|speed_mi_h_lb|speed_mi_h_ub|           name|
+-----+-------------+-------------+---------------+
|    0|            0|            1|           calm|
|    1|            1|            4|      light air|
|    2|            5|            7|   light breeze|
|    3|            8|           11|  gentle breeze|
|    4|           12|           18|moderate breeze|
|    5|           19|           24|   fresh breeze|
|    6|           25|           31|  strong breeze|
|    7|           32|           38|      near gale|
|    8|           39|           46|           gale|
|    9|           47|           54|    strong gale|
|   10|           55|           63|          storm|
|   11|           64|           73|  violent storm|
|   12|           74|         null|      hurricane|
+-----+-------------+-------------+---------------+



Join the current working DF with the `beaufort_land` DF and use join expressions that use the WS ranges

In [13]:
nycflights_wind_visib_beaufort = \
(nycflights_wind_visib
 .join(beaufort_land,
      [nycflights_wind_visib.wind_speed >= beaufort_land.speed_mi_h_lb,
       nycflights_wind_visib.wind_speed < beaufort_land.speed_mi_h_ub],
       'left_outer')
 .withColumn('month', month(nycflights_wind_visib.timestamp)) # Create a month column from `timestamp` values
 .drop('speed_mi_h_lb')
 .drop('speed_mi_h_ub')
)

In [14]:
nycflights_wind_visib_beaufort.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- wind_dir: integer (nullable = true)
 |-- wind_speed: float (nullable = true)
 |-- wind_gust: float (nullable = true)
 |-- visib: float (nullable = true)
 |-- force: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- month: integer (nullable = true)



In [15]:
nycflights_wind_visib_beaufort.filter("name IS NOT NULL").show()

[Stage 5:>                                                          (0 + 1) / 1]

+-------------------+-------+------+------+----+--------+----------+---------+-----+-----+-------------+-----+
|          timestamp|carrier|flight|origin|dest|wind_dir|wind_speed|wind_gust|visib|force|         name|month|
+-------------------+-------+------+------+----+--------+----------+---------+-----+-----+-------------+-----+
|2013-01-01 06:00:00|     B6|   371|   LGA| FLL|     270|  10.35702|11.918652| 10.0|    3|gentle breeze|    1|
|2013-01-01 06:00:00|     MQ|  4650|   LGA| ATL|     270|  10.35702|11.918652| 10.0|    3|gentle breeze|    1|
|2013-01-01 06:01:00|     B6|   343|   EWR| PBI|     270|  10.35702|11.918652| 10.0|    3|gentle breeze|    1|
|2013-01-01 06:02:00|     DL|  1919|   LGA| MSP|     270|  10.35702|11.918652| 10.0|    3|gentle breeze|    1|
|2013-01-01 06:02:00|     MQ|  4401|   LGA| DTW|     270|  10.35702|11.918652| 10.0|    3|gentle breeze|    1|
|2013-01-01 06:06:00|     AA|  1895|   EWR| MIA|     270|  10.35702|11.918652| 10.0|    3|gentle breeze|    1|
|

                                                                                

In [16]:
nycflights_wind_visib_beaufort.filter("NOT name IS NULL").show()

                                                                                

+-------------------+-------+------+------+----+--------+----------+---------+-----+-----+-------------+-----+
|          timestamp|carrier|flight|origin|dest|wind_dir|wind_speed|wind_gust|visib|force|         name|month|
+-------------------+-------+------+------+----+--------+----------+---------+-----+-----+-------------+-----+
|2013-01-01 06:00:00|     B6|   371|   LGA| FLL|     270|  10.35702|11.918652| 10.0|    3|gentle breeze|    1|
|2013-01-01 06:00:00|     MQ|  4650|   LGA| ATL|     270|  10.35702|11.918652| 10.0|    3|gentle breeze|    1|
|2013-01-01 06:01:00|     B6|   343|   EWR| PBI|     270|  10.35702|11.918652| 10.0|    3|gentle breeze|    1|
|2013-01-01 06:02:00|     DL|  1919|   LGA| MSP|     270|  10.35702|11.918652| 10.0|    3|gentle breeze|    1|
|2013-01-01 06:02:00|     MQ|  4401|   LGA| DTW|     270|  10.35702|11.918652| 10.0|    3|gentle breeze|    1|
|2013-01-01 06:06:00|     AA|  1895|   EWR| MIA|     270|  10.35702|11.918652| 10.0|    3|gentle breeze|    1|
|

We can inspect the number of potentially dangerous
takeoffs (i.e., where the Beaufort force is high)
month-by-month through the use of the `crosstab()` function

In [17]:
crosstab_month_force = \
(nycflights_wind_visib_beaufort
 .crosstab('month', 'force'))

                                                                                

In [18]:
crosstab_month_force.show()

+-----------+----+----+----+----+----+----+---+---+---+----+
|month_force|   0|   1|   2|   3|   4|   5|  6|  7|  8|null|
+-----------+----+----+----+----+----+----+---+---+---+----+
|       null|   0|   0|   0|   0|   0|   0|  0|  0|  0|  30|
|          5|2758|1688|6620|7334|3634| 736|217| 38|  0|5770|
|         10|3140|2430|5086|5647|6053| 374| 50|  0|  0|6108|
|          1|1911|1176|5047|6403|5205|1117|521|164| 92|5368|
|          6|1389|1864|5140|7305|5744| 704|  1| 39|  0|6056|
|          9|1716|2142|5985|8661|3646| 268|  0|  0|  0|5154|
|          2|1836|1511|3930|5191|5400|1760|661|  0|  0|4659|
|         12|2723|1573|4531|6126|6380| 354|  0|  0|  0|6442|
|          7|1530|1739|4753|8686|6285| 343|  0|  0|  0|6083|
|          3|1123| 902|3819|6733|7912|2172|445|  0|  0|5725|
|         11|1707|1242|4242|5577|6909|1767|471|  0|  0|5352|
|          8|2938|3302|6172|7015|3945| 203|  0|  0|  0|5750|
|          4|1910|1876|4909|6542|6491|1247| 65|  0|  0|5287|
+-----------+----+----+-

After creating the crosstab DataFrame, use a few functions to clean up the resultant DataFrame

In [19]:
crosstab_month_force = \
(crosstab_month_force
 .withColumn('month_force',
             crosstab_month_force.month_force.cast('int')) # the column is initially a string but recasting as
                                                           # an `int` will aid ordering in the next expression
 .orderBy('month_force')
 .drop('null'))

In [20]:
crosstab_month_force.show()

+-----------+----+----+----+----+----+----+---+---+---+
|month_force|   0|   1|   2|   3|   4|   5|  6|  7|  8|
+-----------+----+----+----+----+----+----+---+---+---+
|       null|   0|   0|   0|   0|   0|   0|  0|  0|  0|
|          1|1911|1176|5047|6403|5205|1117|521|164| 92|
|          2|1836|1511|3930|5191|5400|1760|661|  0|  0|
|          3|1123| 902|3819|6733|7912|2172|445|  0|  0|
|          4|1910|1876|4909|6542|6491|1247| 65|  0|  0|
|          5|2758|1688|6620|7334|3634| 736|217| 38|  0|
|          6|1389|1864|5140|7305|5744| 704|  1| 39|  0|
|          7|1530|1739|4753|8686|6285| 343|  0|  0|  0|
|          8|2938|3302|6172|7015|3945| 203|  0|  0|  0|
|          9|1716|2142|5985|8661|3646| 268|  0|  0|  0|
|         10|3140|2430|5086|5647|6053| 374| 50|  0|  0|
|         11|1707|1242|4242|5577|6909|1767|471|  0|  0|
|         12|2723|1573|4531|6126|6380| 354|  0|  0|  0|
+-----------+----+----+----+----+----+----+---+---+---+



## User Defined Functions (UDFs)

**UDF**s allow for computations of values while looking at every input row in the DataFrame. They allow you to make your own function and import functionality from other **Python** libraries.

Define a function to convert velocity from
miles per hour (mph) to meters per second (mps)

In [24]:
def mph_to_mps(mph):
    try:
        mps = mph * 0.44704
    except:
        mps = 0.0
    return mps

# Register this function as a UDF using `udf()`
mph_to_mps = udf(mph_to_mps, FloatType()) # An output type was specified

Create two new columns that are conversions of wind speeds from mph to mps

In [25]:
(
  nycflights_wind_visib_beaufort
  .withColumn('wind_speed_mps', mph_to_mps('wind_speed'))
  .withColumn('wind_gust_mps', mph_to_mps('wind_gust'))
  .withColumnRenamed('wind_speed', 'wind_speed_mph')
  .withColumnRenamed('wind_gust', 'wind_gust_mph')
  .show()
)

                                                                                

+-------------------+-------+------+------+----+--------+--------------+-------------+-----+-----+-------------+-----+--------------+-------------+
|          timestamp|carrier|flight|origin|dest|wind_dir|wind_speed_mph|wind_gust_mph|visib|force|         name|month|wind_speed_mps|wind_gust_mps|
+-------------------+-------+------+------+----+--------+--------------+-------------+-----+-----+-------------+-----+--------------+-------------+
|               null|carrier|flight|origin|dest|    null|          null|         null| null| null|         null| null|           0.0|          0.0|
|2013-01-01 05:17:00|     UA|  1545|   EWR| IAH|    null|          null|         null| null| null|         null|    1|           0.0|          0.0|
|2013-01-01 05:33:00|     UA|  1714|   LGA| IAH|    null|          null|         null| null| null|         null|    1|           0.0|          0.0|
|2013-01-01 05:42:00|     AA|  1141|   JFK| MIA|    null|          null|         null| null| null|         null|