In [241]:
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.ml.feature import Imputer
from pyspark.sql.types import FloatType

In [66]:
# Intiate our spark session
spark = SparkSession.builder \
    .master("local") \
    .config("spark.executor.memory", "2gb") \
    .appName("Tech Challenge") \
    .getOrCreate()

In [268]:
# Load data into spark
data = spark.read.csv("paytmteam-de-weather-challenge-beb4fc53605c/data/2019/*", header=True)
countries = spark.read.csv("paytmteam-de-weather-challenge-beb4fc53605c/countrylist.csv", header=True)
stations = spark.read.csv("paytmteam-de-weather-challenge-beb4fc53605c/stationlist.csv", header=True)

In [269]:
# Rename column to make joins easier
stations = stations.withColumnRenamed("COUNTRY_ABBR", "STATION_COUNTRY_ABBR")   

In [270]:
# Replace missing values with None, 
# Convert important columns to int
data = data.withColumn("TEMP", F.when(data.TEMP == '9999.9', None).otherwise(data.TEMP))
data = data.withColumn("WDSP", F.when(data.WDSP == '999.9', None).otherwise(data.WDSP))

data = data.withColumn("TEMP", data.TEMP.cast(FloatType()))
data = data.withColumn("WDSP", data.WDSP.cast(FloatType()))

In [273]:
# Join Countries to stations and data to stations.
dataset = stations.join(countries, stations.STATION_COUNTRY_ABBR == countries.COUNTRY_ABBR) \
                    .join(data, F.col("STN---") == stations.STN_NO)

In [274]:
# Replace missing temperature values with the mean

imputer = Imputer(strategy='mean', inputCol="TEMP", outputCol='TEMP_IMPUTED', missingValue=9999.9)
impute_model = imputer.fit(dataset)

dataset_temp_imputed = impute_model.transform(dataset)

In [275]:
# Replace missing wind speed values with the mean. 
# Ideally this could be done in the same operation as above, 
# however different missing values makes this complicated.

imputer = Imputer(strategy='mean', inputCol="WDSP", outputCol='WDSP_IMPUTED', missingValue=999.9)
impute_model = imputer.fit(dataset_temp_imputed)

dataset_both_imputed = impute_model.transform(dataset_temp_imputed)

# Which country had the hottest average mean temperature over the year?

In [276]:
dataset_both_imputed.groupby(F.col("COUNTRY_FULL")) \
        .agg(F.mean("TEMP_IMPUTED").alias("avg_temp")) \
        .sort(F.col("avg_temp").desc()) \
        .first()

Row(COUNTRY_FULL='DJIBOUTI', avg_temp=90.06114474836602)

Djibouti had the hottest average temperature for the year at 90.1 degrees Fahrenheit.

# Which country had the most consecutive days of tornadoes/funnel cloud formations?

In [230]:
tornadoes = dataset_both_imputed.withColumn("tornado", substring(F.col("FRSHTT"), 6, 1)) \
    .groupby(F.col("COUNTRY_FULL"), F.col("YEARMODA")) \
    .agg(F.max("tornado").alias("tornado_today")) \
    .sort(F.col("COUNTRY_FULL"), F.col("YEARMODA"))

w1 = Window.partitionBy(tornadoes.COUNTRY_FULL).orderBy(tornadoes.YEARMODA)
w2 = Window.partitionBy(tornadoes.COUNTRY_FULL, tornadoes.tornado_today).orderBy(tornadoes.YEARMODA)

diff = tornadoes.withColumn('tmp', F.row_number().over(w1) - F.row_number().over(w2))

w3 = Window.partitionBy(diff.COUNTRY_FULL, diff.tornado_today, diff.tmp).orderBy(diff.YEARMODA)

tornado_streaks = diff.withColumn('streak', F.when(diff.tornado_today == 0, 0).otherwise(F.row_number().over(w3))) \
                            .sort(F.col("YEARMODA")) \
                            .drop(F.col("tmp"))

tornado_streaks.groupby(F.col("COUNTRY_FULL")) \
    .agg(F.max(F.col("streak")).alias("max_streak")) \
    .sort(F.col("max_streak").desc()) \
    .show()

+--------------+----------+
|  COUNTRY_FULL|max_streak|
+--------------+----------+
|         INDIA|         2|
|         JAPAN|         2|
|        CANADA|         2|
|CAYMAN ISLANDS|         2|
|         ITALY|         2|
| UNITED STATES|         2|
|         GHANA|         2|
|         MALTA|         1|
|    COSTA RICA|         1|
|         NIGER|         1|
|   BAHAMAS THE|         1|
|       ALGERIA|         1|
|      GUERNSEY|         1|
|         ARUBA|         1|
|       SENEGAL|         1|
|   PUERTO RICO|         1|
|      COLOMBIA|         1|
|        JERSEY|         1|
|    BANGLADESH|         1|
|      TANZANIA|         1|
+--------------+----------+
only showing top 20 rows



Seven countries tied for the most consecutive days of tornadoes; India, Japan, Canada, Cayman Islands, Italy, and the US, each with 2-day long streaks.

# Which country had the second highest average mean wind speed over the year?

In [277]:
avg_windspeed = dataset_both_imputed.groupby(F.col("COUNTRY_FULL")) \
        .agg(F.mean("WDSP_IMPUTED").alias("avg_wdsp")) \
        .sort(F.col("avg_wdsp").desc()).take(2)[1]

print(avg_windspeed)

Row(COUNTRY_FULL='ARUBA', avg_wdsp=15.97568304160905)


Aruba had the second highest average windspeed at 16 knots. 

While the method of taking 2 and picking the last one works for selecting the second element, it would be very inefficient for selecting the Nth element for high N.