In [1]:
import os
import numpy as np
import pandas as pd
pd.set_option('display.max_colwidth', 150)
import matplotlib.pyplot as plt
%matplotlib inline 

In [78]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, udf, lag, avg, year, max as _max, min as _min, row_number, unix_timestamp, from_unixtime, udf, dayofmonth, count
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

### Load Datasets

In [3]:
weather_data = '/data/2019'

In [4]:
station_data = '/data/stationlist.csv'

In [5]:
country_data = '/data/countrylist.csv'

In [6]:
weather_file_list = os.listdir(weather_data)

In [7]:
weather_files = []
for file in weather_file_list:
    weather_files.append(f'{weather_data}/{file}')

In [8]:
weather_df = sqlContext.read.csv(weather_files, header=True)

In [9]:
station_df = sqlContext.read.csv(station_data, header=True)

In [10]:
station_df.show()

+------+------------+
|STN_NO|COUNTRY_ABBR|
+------+------------+
|012240|          NO|
|020690|          SW|
|020870|          SW|
|021190|          SW|
|032690|          UK|
|033450|          UK|
|039290|          UK|
|039790|          EI|
|040480|          IC|
|041300|          IC|
|060100|          FO|
|061443|          DA|
|063401|          NL|
|071910|          FR|
|092640|          GM|
|123766|          PL|
|125990|          PL|
|129700|          HU|
|132240|          HR|
|156500|          BU|
+------+------------+
only showing top 20 rows



In [11]:
country_df = sqlContext.read.csv(country_data, header=True)

In [12]:
country_df.show()

+------------+--------------------+
|COUNTRY_ABBR|        COUNTRY_FULL|
+------------+--------------------+
|          AA|               ARUBA|
|          AC| ANTIGUA AND BARBUDA|
|          AF|         AFGHANISTAN|
|          AG|             ALGERIA|
|          AI|    ASCENSION ISLAND|
|          AJ|          AZERBAIJAN|
|          AL|             ALBANIA|
|          AM|             ARMENIA|
|          AN|             ANDORRA|
|          AO|              ANGOLA|
|          AQ|      AMERICAN SAMOA|
|          AR|           ARGENTINA|
|          AS|           AUSTRALIA|
|          AT|ASHMORE AND CARTI...|
|          AU|             AUSTRIA|
|          AV|            ANGUILLA|
|          AX|             ANTIGUA|
|          AY|          ANTARCTICA|
|          AZ|              AZORES|
|          BA|             BAHRAIN|
+------------+--------------------+
only showing top 20 rows



### Join Station and Country dataframes to get full country names

In [13]:
station_country_df = station_df.join(country_df, on=['COUNTRY_ABBR'], how='inner')

In [14]:
station_country_df.show()

+------------+------+--------------+
|COUNTRY_ABBR|STN_NO|  COUNTRY_FULL|
+------------+------+--------------+
|          NO|012240|        NORWAY|
|          SW|020690|        SWEDEN|
|          SW|020870|        SWEDEN|
|          SW|021190|        SWEDEN|
|          UK|032690|UNITED KINGDOM|
|          UK|033450|UNITED KINGDOM|
|          UK|039290|UNITED KINGDOM|
|          EI|039790|       IRELAND|
|          IC|040480|       ICELAND|
|          IC|041300|       ICELAND|
|          FO|060100| FAROE ISLANDS|
|          DA|061443|       DENMARK|
|          NL|063401|   NETHERLANDS|
|          FR|071910|        FRANCE|
|          GM|092640|       GERMANY|
|          PL|123766|        POLAND|
|          PL|125990|        POLAND|
|          HU|129700|       HUNGARY|
|          HR|132240|       CROATIA|
|          BU|156500|      BULGARIA|
+------------+------+--------------+
only showing top 20 rows



### Join Weather and Country dataframe to get full country names with weather

In [15]:
weather_df.show()

+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+
|STN---| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD| GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|
+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+
|010260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9| 29.7| 29.8|21.7*|0.02G| 18.5|001000|
|010260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6| 22.1|27.1*| 20.7|0.48G| 22.8|001000|
|010260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4| 49.5|37.4*|26.8*|0.25G|999.9|011000|
|010260|99999|20190104|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8| 10.9| 36.1| 31.8|0.52G|999.9|001000|
|010260|99999|20190105|35.5|33.0|1015.7|1001.4| 12.0|10.4| 13.6| 21.0|38.5*| 32.7|0.02G| 23.6|010000|
|010260|99999|20190106|38.5|34.1|1008.2| 994.2| 12.8|10.0| 17.5| 28.9| 41.4|33.8*|0.12G| 23.2|010000|
|010260|99999|20190107|32.1|29.8| 996.8| 982.7|  6.9|11.3| 15.5| 28.6|35.1*| 30.4|

In [16]:
# rename station number column in weather df to join stationcountry df
weather_country_df = weather_df.withColumnRenamed('STN---', 'STN_NO').join(station_country_df, on=['STN_NO'], how='inner')

In [17]:
weather_country_df.show()

+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+------------+------------+
|STN_NO| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD| GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|COUNTRY_ABBR|COUNTRY_FULL|
+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+------------+------------+
|010875|99999|20190101|41.1|30.1|9999.9|9999.9|  5.9|46.7| 59.1| 74.0|44.6*|37.4*|99.99|999.9|011010|          NO|      NORWAY|
|010875|99999|20190102|40.5|29.0|9999.9|9999.9|  6.2|20.5| 32.1| 44.1|41.0*|37.4*|99.99|999.9|010000|          NO|      NORWAY|
|010875|99999|20190103|43.0|36.6|9999.9|9999.9|  6.1|13.5| 21.0|999.9|44.6*|41.0*|0.00I|999.9|000000|          NO|      NORWAY|
|010875|99999|20190104|46.7|44.4|9999.9|9999.9|  5.8|27.4| 33.0| 40.0|48.2*|42.8*|99.99|999.9|010000|          NO|      NORWAY|
|010875|99999|20190105|46.5|44.1|9999.9|9999.9|  6.1|18.3| 25.1|999.9|48.2*|44.6*|99.99|999.9|010000|   

### Questions

#### Which country had the hottest average mean temperature over the year?

In [18]:
# add year and day of month to weather dataframe from string date column
weather_by_year = weather_country_df.withColumn("DATE", from_unixtime(unix_timestamp('YEARMODA', 'yyyyMMdd'))).withColumn('YEAR',year('DATE')).\
                                     withColumn('DAY', dayofmonth('DATE'))

In [21]:
avg_temp_grouped_by_year_country = weather_by_year.groupby("YEAR", "COUNTRY_FULL").agg(avg("TEMP").alias('AVG_TEMP'))

In [22]:
avg_temp_grouped_by_year_country.show()

+----+--------------+------------------+
|YEAR|  COUNTRY_FULL|          AVG_TEMP|
+----+--------------+------------------+
|2019|        CANADA| 35.28871680510012|
|2019|        GUINEA| 81.46839080459773|
|2019|          NIUE| 76.27808219178088|
|2019|  TURKMENISTAN|63.430000000000014|
|2019|      THAILAND|    82.51935144935|
|2019|         ITALY| 59.62432907511501|
|2020|MIDWAY ISLANDS|              62.8|
|2019|       HUNGARY|53.596376158071635|
|2019|    MARTINIQUE| 78.12547945205476|
|2019|        TAIWAN| 75.89780821917807|
|2019|          IRAN| 67.00280045318536|
|2019|         NEPAL| 72.13298855633802|
|2019|        UGANDA| 75.76807359307358|
|2019|      TANZANIA| 74.20567612687815|
|2019|       ESTONIA|45.701339132425325|
|2019|     SINGAPORE| 83.63278538812786|
|2019|       SENEGAL| 83.11452655889146|
|2019|          FIJI| 78.45093764650726|
|2019|   WAKE ISLAND| 81.39258114374036|
|2019|         ARUBA| 82.97342465753424|
+----+--------------+------------------+
only showing top

In [23]:
w_avg_temp = Window().partitionBy("YEAR").orderBy(col("AVG_TEMP").desc())

In [24]:
# show the countries by year that had maximum average temperatures by using row number (rn=1) over the ordered partition

# the two countries with the hottest mean temp in each year are shown below:  2019 Djibouti, 2020 Marshall Islands

avg_temp_grouped_by_year_country.withColumn("rn", row_number().over(w_avg_temp)).where(col("rn") == 1).select("YEAR", "COUNTRY_FULL").show()

+----+----------------+
|YEAR|    COUNTRY_FULL|
+----+----------------+
|2019|        DJIBOUTI|
|2020|MARSHALL ISLANDS|
+----+----------------+



#### Which country had the most consecutive days of tornadoes/funnel cloud formations?

In [25]:
# function to strip tornado indicator from string
udf1 = udf(lambda x: x[-1], StringType())

In [26]:
# extract toronado indicator and add to weather dataframe as TORNADO_IND
weather_by_year_tornado_ind = weather_by_year.withColumn('TORNADO_IND',udf1('FRSHTT'))

In [34]:
# number of tornado events to check that indicator works
weather_by_year_tornado_ind.select('TORNADO_IND').where(col('TORNADO_IND')==1).count()

309

In [70]:
w_day = Window().partitionBy("COUNTRY_FULL").orderBy(col('DAY').asc())

In [82]:
# perform a difference of previous day of tornado event to day of tornado event for each country
# count the number of differences of 1 to indicate consecutive days for each country

# the country with the most consecutive tornados is shown below: United States 14

weather_by_year_tornado_ind.filter(col('TORNADO_IND')==1).withColumn('PREV_DAY', lag('DAY',1).over(w_day))\
                    .withColumn('DAYDIFF', col('DAY')-col('PREV_DAY')).select('COUNTRY_FULL','DAY','PREV_DAY','DAYDIFF').filter(col('DAYDIFF')==1).\
                    groupBy('COUNTRY_FULL').agg(count(col('DAYDIFF')).alias('NUM_CONSECUTIVE_DAYS')).orderBy(col('NUM_CONSECUTIVE_DAYS').desc()).show()

+--------------------+--------------------+
|        COUNTRY_FULL|NUM_CONSECUTIVE_DAYS|
+--------------------+--------------------+
|       UNITED STATES|                  14|
|               ITALY|                  11|
|      CAYMAN ISLANDS|                   7|
|               INDIA|                   3|
|              CANADA|                   3|
|               JAPAN|                   3|
|          COSTA RICA|                   2|
|NETHERLANDS ANTILLES|                   2|
|           INDONESIA|                   2|
|               GHANA|                   2|
|         BAHAMAS THE|                   1|
|                MALI|                   1|
|             NIGERIA|                   1|
|              NORWAY|                   1|
|              ANGOLA|                   1|
|              TURKEY|                   1|
|             AUSTRIA|                   1|
|                LAOS|                   1|
|       COTE D'IVOIRE|                   1|
+--------------------+----------

#### Which country had the second highest average mean wind speed over the year?

In [84]:
avg_wind_grouped_by_year_country = weather_by_year.groupby("YEAR", "COUNTRY_FULL").agg(avg("WDSP").alias('AVG_WIND'))

In [85]:
w_avg_wind = Window().partitionBy("YEAR").orderBy(col("AVG_WIND").desc())

In [86]:
# show the countries by year that had the second largest average wind speed by using second row number (rn=2) over the ordered partition

# the two countries with the second highest avg mean wind speed in each year are shown below: 2019 Armenia, 2020 Bermuda

avg_wind_grouped_by_year_country.withColumn("rn", row_number().over(w_avg_wind)).where(col("rn") == 2).select("YEAR", "COUNTRY_FULL").show()

+----+------------+
|YEAR|COUNTRY_FULL|
+----+------------+
|2019|     ARMENIA|
|2020|     BERMUDA|
+----+------------+

