In [1]:
import os
from pyspark.sql import SparkSession

In [2]:
input_prefix = 'input'
# want test code to take SparkSession as input because we want this to be able to run on a spark cluster
spark = SparkSession.builder.getOrCreate()

In [3]:
from etl.load import load
from etl.clean import nullify_missing_values, parse_date

In [4]:
countrylist, stationlist, data = load(spark=spark, input_prefix=input_prefix)

In [5]:
data.show()

+------+-----+--------+----+----+------+------+-----+----+-----+-----+----+----+----+-----+------+
|STN---| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD| GUST| MAX| MIN|PRCP| SNDP|FRSHTT|
+------+-----+--------+----+----+------+------+-----+----+-----+-----+----+----+----+-----+------+
| 10260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9| 29.7|29.8|null|null| 18.5|001000|
| 10260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6| 22.1|null|20.7|null| 22.8|001000|
| 10260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4| 49.5|null|null|null|999.9|011000|
| 10260|99999|20190104|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8| 10.9|36.1|31.8|null|999.9|001000|
| 10260|99999|20190105|35.5|33.0|1015.7|1001.4| 12.0|10.4| 13.6| 21.0|null|32.7|null| 23.6|010000|
| 10260|99999|20190106|38.5|34.1|1008.2| 994.2| 12.8|10.0| 17.5| 28.9|41.4|null|null| 23.2|010000|
| 10260|99999|20190107|32.1|29.8| 996.8| 982.7|  6.9|11.3| 15.5| 28.6|null|30.4|null|999.9|001000|
| 10260|99

In [6]:
data = nullify_missing_values(data)
data = parse_date(data)

In [7]:
data.show()

+------+-----+-------------------+----+----+------+------+-----+----+-----+----+----+----+----+----+------+
|STN---| WBAN|           YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST| MAX| MIN|PRCP|SNDP|FRSHTT|
+------+-----+-------------------+----+----+------+------+-----+----+-----+----+----+----+----+----+------+
| 10260|99999|2019-01-01 00:00:00|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9|29.7|29.8|null|null|18.5|001000|
| 10260|99999|2019-01-02 00:00:00|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6|22.1|null|20.7|null|22.8|001000|
| 10260|99999|2019-01-03 00:00:00|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4|49.5|null|null|null|null|011000|
| 10260|99999|2019-01-04 00:00:00|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8|10.9|36.1|31.8|null|null|001000|
| 10260|99999|2019-01-05 00:00:00|35.5|33.0|1015.7|1001.4| 12.0|10.4| 13.6|21.0|null|32.7|null|23.6|010000|
| 10260|99999|2019-01-06 00:00:00|38.5|34.1|1008.2| 994.2| 12.8|10.0| 17.5|28.9|41.4|null|null|23.2|010000|
| 10260|99999|2019-01-07 00:

In [8]:
station_attr = countrylist.join(stationlist, on='COUNTRY_ABBR', how='outer')

In [9]:
data_full = data.join(
    station_attr.withColumnRenamed('STN_NO', 'STN---'),
    on='STN---',
    how='left'
).cache()

In [11]:
data_full.limit(100).toPandas()

Unnamed: 0,STN---,WBAN,YEARMODA,TEMP,DEWP,SLP,STP,VISIB,WDSP,MXSPD,GUST,MAX,MIN,PRCP,SNDP,FRSHTT,COUNTRY_ABBR,COUNTRY_FULL
0,13840,99999,2019-01-01,38.599998,24.799999,,,,11.7,33.599998,44.099998,,,,,000000,NO,NORWAY
1,13840,99999,2019-01-02,30.500000,16.600000,,,,8.0,23.700001,35.000000,,,,,000000,NO,NORWAY
2,13840,99999,2019-01-03,32.200001,21.700001,,,,3.5,13.000000,,,,,,000000,NO,NORWAY
3,13840,99999,2019-01-04,34.200001,27.200001,,,,2.6,6.000000,,,,,,000000,NO,NORWAY
4,13840,99999,2019-01-05,30.600000,27.400000,,,15.5,2.0,6.400000,,,,,,100000,NO,NORWAY
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,13840,99999,2019-04-06,37.700001,31.799999,,,9.8,3.5,11.100000,,,,,,111000,NO,NORWAY
96,13840,99999,2019-04-07,39.000000,24.299999,,,11.1,8.0,17.100000,,,,,,000000,NO,NORWAY
97,13840,99999,2019-04-08,36.599998,16.000000,,,15.5,11.9,19.400000,26.000000,,,,,000000,NO,NORWAY
98,13840,99999,2019-04-09,33.000000,11.600000,,,13.7,9.3,19.600000,25.100000,,,,,000000,NO,NORWAY


Step 2 - Questions

Using the global weather data, answer the following:

1. Which country had the hottest average mean temperature over the year?
2. Which country had the most consecutive days of tornadoes/funnel cloud
formations?
3. Which country had the second highest average mean wind speed over the year?

In [14]:
import pyspark.sql.functions as F

In [17]:
data_full.withColumn(
    'month', F.month(F.col('YEARMODA'))
).groupBy(
    'COUNTRY_FULL', 'month'
).agg(
    # possibly multiple weather stations (unverified)
    # thus aggregate by monthly first
    # ideally aggregate by day -> by month -> by year
    # leaving that for later if there's time
    F.mean('TEMP').alias('mean_monthly_TEMP')
).groupBy(
    'COUNTRY_FULL'
).agg(
    F.mean('mean_monthly_TEMP').alias('monthly_TEMP')
).orderBy(
    F.col('monthly_TEMP'), ascending=False
).show()

+--------------------+-----------------+
|        COUNTRY_FULL|     monthly_TEMP|
+--------------------+-----------------+
|            DJIBOUTI|90.23198015990424|
|                CHAD|87.14639472082757|
|               SUDAN|85.07542338470381|
|               NIGER|85.03855115744923|
| JUAN DE NOVA ISLAND|84.51182777650894|
|                MALI| 84.4698450533835|
|         EL SALVADOR|84.44803416216793|
|              TUVALU|84.34886658865116|
|BRITISH INDIAN OC...| 83.9194399820983|
|             TOKELAU|83.84275418635283|
|      CAYMAN ISLANDS|83.76236167953455|
|        BURKINA FASO|83.75583106326813|
|            MALDIVES|83.67978358497757|
|           SINGAPORE|83.63143542755746|
|ST. VINCENT AND T...|83.54223237327423|
|            CAMBODIA|83.50356529178372|
|    MARSHALL ISLANDS| 83.2364911782702|
|          MICRONESIA|83.23250023814408|
|             SENEGAL|83.13447681132101|
|            KIRIBATI|83.07528320407322|
+--------------------+-----------------+
only showing top

Q: Which country had the hottest average mean temperature over the year?

A: DJIBOUTI

makes, sense, it's in a dry part of Africa

Which country had the most consecutive days of tornadoes/funnel cloud formations?


In [33]:
from pyspark.sql.window import Window

# do we have daily observations for each country?
# can only compute for consecutive dates
w = Window.partitionBy('COUNTRY_FULL').orderBy('YEARMODA')
country_date_diff = (
    data_full
    .select('COUNTRY_FULL', 'YEARMODA')
    .distinct()
    .withColumn(
       'inter_date',
        F.datediff(
            F.col('YEARMODA'),
            F.lag(F.col('YEARMODA'), 1).over(w)
        )
    )
).orderBy(
    'COUNTRY_FULL', 'YEARMODA'
)

# filter for countries with inter_date only null or 1
country_single_consecutive = (
    country_date_diff
    .groupBy('COUNTRY_FULL')
    .agg(F.collect_set('inter_date').alias('set_inter_date'))
) #.show()
# filter(
#     F.col('set_inter_date') == {None, 1}
# )
country_single_consecutive.show()

SyntaxError: invalid syntax (<ipython-input-33-dcfa0ad2c689>, line 29)

In [18]:
data_full.withColumn(
    'month', F.month(F.col('YEARMODA'))
).groupBy(
    'COUNTRY_FULL', 'month'
).agg(
    F.mean('WDSP').alias('mean_monthly_WDSP')
).groupBy(
    'COUNTRY_FULL'
).agg(
    F.mean('mean_monthly_WDSP').alias('monthly_WDSP')
).orderBy(
    F.col('monthly_WDSP'), ascending=False
).show()

+--------------------+------------------+
|        COUNTRY_FULL|      monthly_WDSP|
+--------------------+------------------+
|FALKLAND ISLANDS ...|17.853323291643207|
|               ARUBA|15.977568466445604|
|       FAROE ISLANDS|15.004640085348584|
|            BARBADOS|14.104861833330643|
|ST. PIERRE AND MI...|13.964271671947186|
|FRENCH SOUTHERN A...|13.721410918455398|
|          CAPE VERDE|13.689624769963283|
|          MAURITANIA|13.049711560945854|
|     TROMELIN ISLAND|12.996336024862229|
|          ST. HELENA| 12.70270694631401|
|          ANTARCTICA|12.317377902614764|
|             SOMALIA|12.270474730728104|
|COCOS (KEELING) I...| 12.04136264134967|
|            GUERNSEY|12.027518638882889|
|        MAN  ISLE OF|11.902185717540952|
|          MONTSERRAT|11.813225809104981|
|             ICELAND|11.793342469451671|
|      WESTERN SAHARA|11.652829126173785|
|           ST. LUCIA|11.580525361820179|
|            SVALBARD|11.489402906171904|
+--------------------+------------

Q: Which country had the second highest average mean wind speed over the year?

A: ARUBA|15.977568466445604

Makes sense! It's right in the middle of the Caribbean, which has hurricanes.