# General Set-Up

To accomplish this task, I used a Windows machine with Pyspark set-up. And since it contains multiple questions, I would rely on Jupyter Notebook to quickly prototype a solution with some comments. 

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col, expr
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, udf
from pyspark.sql import functions as F


spark = SparkSession.builder.appName("victor_paytm").getOrCreate()
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
spark

# Step 1
### 1. Load the global weather data into Spark

In [2]:
partitioned =spark.read.option("header", "true").csv('data/2019/')

In [3]:
# select only the first four characters from the column 
partitioned = partitioned.withColumn("PRCP",expr("substring(PRCP, 1, length(PRCP)-1)"))

# there are characters in these columns
partitioned = partitioned.withColumn("MAX",partitioned.MAX.substr(1, 4))
partitioned = partitioned.withColumn("MIN",partitioned.MAX.substr(1, 4))

# the column name is not appropriate
partitioned =partitioned.withColumnRenamed("STN---", "STN")
partitioned.show(5)

+------+-----+--------+----+----+------+------+-----+----+-----+----+----+----+----+-----+------+
|   STN| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST| MAX| MIN|PRCP| SNDP|FRSHTT|
+------+-----+--------+----+----+------+------+-----+----+-----+----+----+----+----+-----+------+
|010260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9|29.7|29.8|29.8|0.02| 18.5|001000|
|010260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6|22.1|27.1|27.1|0.48| 22.8|001000|
|010260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4|49.5|37.4|37.4|0.25|999.9|011000|
|010260|99999|20190104|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8|10.9|36.1|36.1|0.52|999.9|001000|
|010260|99999|20190105|35.5|33.0|1015.7|1001.4| 12.0|10.4| 13.6|21.0|38.5|38.5|0.02| 23.6|010000|
+------+-----+--------+----+----+------+------+-----+----+-----+----+----+----+----+-----+------+
only showing top 5 rows



### Then, I would handle the missing value in this case

In [4]:
def replace(column, value):
    return when(column != value, column).otherwise(lit(None))

nullDict = {'TEMP': 9999.9,
            'DEWP': 9999.9,
            'SLP': 9999.9,
            'STP': 9999.9,
            'VISIB': 999.9,
            'WDSP': 999.9,
            'MXSPD': 999.9,
            'GUST':999.9,
            'MAX':9999.9,
            'MIN':9999.9,
            'PRCP':99.9,
            'SNDP':999.9}
# loop the dict to deal with NA
for key, val in nullDict.items():
    partitioned = partitioned.withColumn(key,replace(col(key), val))

In [5]:
partitioned.show(5)

+------+-----+--------+----+----+------+------+-----+----+-----+----+----+----+----+----+------+
|   STN| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST| MAX| MIN|PRCP|SNDP|FRSHTT|
+------+-----+--------+----+----+------+------+-----+----+-----+----+----+----+----+----+------+
|010260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9|29.7|29.8|29.8|0.02|18.5|001000|
|010260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6|22.1|27.1|27.1|0.48|22.8|001000|
|010260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4|49.5|37.4|37.4|0.25|null|011000|
|010260|99999|20190104|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8|10.9|36.1|36.1|0.52|null|001000|
|010260|99999|20190105|35.5|33.0|1015.7|1001.4| 12.0|10.4| 13.6|21.0|38.5|38.5|0.02|23.6|010000|
+------+-----+--------+----+----+------+------+-----+----+-----+----+----+----+----+----+------+
only showing top 5 rows



###  2. Join the stationlist.csv with the countrylist.csv to get the full country name for each station number.

In [6]:
stationlist = spark.read.option("header", "true").csv('stationlist.csv')
stationlist.show(5)

+------+------------+
|STN_NO|COUNTRY_ABBR|
+------+------------+
|012240|          NO|
|020690|          SW|
|020870|          SW|
|021190|          SW|
|032690|          UK|
+------+------------+
only showing top 5 rows



In [7]:
countrylist = spark.read.option("header", "true").csv('countrylist.csv')
countrylist.show(5)

+------------+-------------------+
|COUNTRY_ABBR|       COUNTRY_FULL|
+------------+-------------------+
|          AA|              ARUBA|
|          AC|ANTIGUA AND BARBUDA|
|          AF|        AFGHANISTAN|
|          AG|            ALGERIA|
|          AI|   ASCENSION ISLAND|
+------------+-------------------+
only showing top 5 rows



In [8]:
stationWithCountry = stationlist.join(countrylist,stationlist.COUNTRY_ABBR ==  countrylist.COUNTRY_ABBR,"left")

### 3. Join the global weather data with the full country names by station number

In [9]:
globWeatherCountry = partitioned.join(stationWithCountry, partitioned.STN == stationWithCountry.STN_NO,"left")
globWeatherCountry.show(5)

+------+-----+--------+----+----+------+------+-----+----+-----+----+----+----+----+----+------+------+------------+------------+------------+
|   STN| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST| MAX| MIN|PRCP|SNDP|FRSHTT|STN_NO|COUNTRY_ABBR|COUNTRY_ABBR|COUNTRY_FULL|
+------+-----+--------+----+----+------+------+-----+----+-----+----+----+----+----+----+------+------+------------+------------+------------+
|010260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9|29.7|29.8|29.8|0.02|18.5|001000|010260|          NO|          NO|      NORWAY|
|010260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6|22.1|27.1|27.1|0.48|22.8|001000|010260|          NO|          NO|      NORWAY|
|010260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4|49.5|37.4|37.4|0.25|null|011000|010260|          NO|          NO|      NORWAY|
|010260|99999|20190104|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8|10.9|36.1|36.1|0.52|null|001000|010260|          NO|          NO|      NORWAY|

# STEP 2
### 1. Which country had the hottest average mean temperature over the year?

In [10]:
from pyspark.sql.types import IntegerType
# change type to integer

globWeatherCountry.withColumn("TEMP", globWeatherCountry["TEMP"].cast(IntegerType()))\
.groupBy("COUNTRY_FULL").avg("TEMP").\
withColumnRenamed("avg(TEMP)", "avg_mean_temp").\
sort("avg_mean_temp",ascending=False).show(5)

+------------+-----------------+
|COUNTRY_FULL|    avg_mean_temp|
+------------+-----------------+
|    DJIBOUTI|89.56927710843374|
|        CHAD|  86.908357771261|
|       NIGER|84.61118216700164|
|       SUDAN|84.01162790697674|
| EL SALVADOR| 83.9929676511955|
+------------+-----------------+
only showing top 5 rows



### So, DJIBOUTI had the hottest average mean temperature over the year

### 2. Which country had the most consecutive days of tornadoes/funnel cloud formations?

In [11]:
tornado_df = globWeatherCountry.\
select('COUNTRY_FULL','YEARMODA','FRSHTT').\
withColumn('Tornado',partitioned.FRSHTT.substr(6,7))
tornado_df = tornado_df.withColumn("Tornado", tornado_df["Tornado"].cast(IntegerType()))


tornado_df = tornado_df.groupBy(['COUNTRY_FULL','YEARMODA']).agg({'Tornado':'max'}).\
withColumnRenamed("max(Tornado)","max_tornado_by_country_day").\
sort(["COUNTRY_FULL",'YEARMODA'],ascending=False)
tornado_df.show(5)

+------------+--------+--------------------------+
|COUNTRY_FULL|YEARMODA|max_tornado_by_country_day|
+------------+--------+--------------------------+
|    ZIMBABWE|20191231|                         0|
|    ZIMBABWE|20191230|                         0|
|    ZIMBABWE|20191229|                         0|
|    ZIMBABWE|20191228|                         0|
|    ZIMBABWE|20191227|                         0|
+------------+--------+--------------------------+
only showing top 5 rows



In [12]:
# To solve this problem, I used a self-defined UDF to aggregate all data
def mostConsecutive(seq):
    globalMax = 0
    localMax = 0
    for fast in range(len(seq)):
        if seq[fast] == 1:
            localMax += 1
            globalMax = max(globalMax, localMax)
        else:
            localMax = 0
    return globalMax


# doing a small test
seq = [0,0,1,1,1,1,1,0,0,1,1,1,0]
mostConsecutive(seq)


5

In [13]:
udfMostConsecutive = F.udf(mostConsecutive, IntegerType())
tornado_df.\
groupBy('COUNTRY_FULL').\
agg(udfMostConsecutive(F.collect_list('max_tornado_by_country_day')).alias('num_tornadoes')).\
sort("num_tornadoes",ascending=False).show(10)

+--------------+-------------+
|  COUNTRY_FULL|num_tornadoes|
+--------------+-------------+
|         GHANA|            2|
|CAYMAN ISLANDS|            2|
|         ITALY|            2|
|         JAPAN|            2|
|        CANADA|            2|
|         INDIA|            2|
| UNITED STATES|            2|
|      MALDIVES|            1|
|       SENEGAL|            1|
|        JERSEY|            1|
+--------------+-------------+
only showing top 10 rows



### So, there are seven countries had the same consecutive Tornadoes days. Which are:
1. India, 2 days
2. Japan, 2 days
3. Canada, 2 days
4. Cayman Islands, 2 days
5. Italy, 2 days
6. Ghana, 2 days
7. United States, 2 days

### 3. Which country had the second highest average mean wind speed over the year?

In [14]:
globWeatherCountry.withColumn("WDSP", globWeatherCountry["WDSP"].cast(IntegerType())).\
groupBy("COUNTRY_FULL").\
avg("WDSP").\
withColumnRenamed("avg(WDSP)", "avg_mean_wind_speed").\
sort("avg_mean_wind_speed",ascending=False).\
show(5)

+--------------------+-------------------+
|        COUNTRY_FULL|avg_mean_wind_speed|
+--------------------+-------------------+
|FALKLAND ISLANDS ...| 17.429920477137177|
|               ARUBA| 15.513661202185792|
|       FAROE ISLANDS| 14.845360824742269|
|FRENCH SOUTHERN A...| 13.769833496571989|
|            BARBADOS| 13.653005464480874|
+--------------------+-------------------+
only showing top 5 rows



### The second higher average mean wind speed is Aruba