# Initialize

In [82]:
import pyspark

from pyspark import SQLContext

from pyspark.sql.types import StructType, StructField, FloatType, BooleanType
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql import functions as F  # to make sure that Spark functions do not get confused with native Python functions, such as max.

In [34]:
# Environment: Windows 10. The following line converts to a path that Spark will understand.
import os
working_dir = '/'.join(os.getcwd().split("\\"))
working_dir

'C:/Users/Sinan Ozel/Jupyter/JobInterviews/PayTM/paytmteam-de-weather-challenge-beb4fc53605c'

In [3]:
conf = pyspark.SparkConf() 


In [7]:
sc = pyspark.SparkContext.getOrCreate(conf=conf)
sqlcontext = SQLContext(sc)

# Load Data

In [172]:
weather = sqlcontext\
    .read\
    .option("header", True)\
    .option("dateFormat", "yyyyMMdd")\
    .option("inferSchema","true")\
    .csv("./data/2019")\
    .withColumn('MEASUREMENT_DATE', F.to_date(F.col("YEARMODA").cast('STRING'), 'yyyyMMdd'))

In [173]:
weather.columns

['STN---',
 'WBAN',
 'YEARMODA',
 'TEMP',
 'DEWP',
 'SLP',
 'STP',
 'VISIB',
 'WDSP',
 'MXSPD',
 'GUST',
 'MAX',
 'MIN',
 'PRCP',
 'SNDP',
 'FRSHTT',
 'MEASUREMENT_DATE']

In [174]:
weather.printSchema()

root
 |-- STN---: integer (nullable = true)
 |-- WBAN: integer (nullable = true)
 |-- YEARMODA: integer (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- SLP: double (nullable = true)
 |-- STP: double (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- MAX: string (nullable = true)
 |-- MIN: string (nullable = true)
 |-- PRCP: string (nullable = true)
 |-- SNDP: double (nullable = true)
 |-- FRSHTT: integer (nullable = true)
 |-- MEASUREMENT_DATE: date (nullable = true)



In [175]:
weather.count()

4158416

In [176]:
countrylist = sqlcontext\
    .read\
    .option("header",True)\
    .format("csv")\
    .load("file:///" + working_dir + "/countrylist.csv")

In [177]:
type(countrylist)

pyspark.sql.dataframe.DataFrame

In [178]:
countrylist.columns

['COUNTRY_ABBR', 'COUNTRY_FULL']

In [179]:
countrylist.count()

288

In [180]:
stationlist = sqlcontext\
    .read\
    .option("header",True)\
    .format("csv")\
    .load("file:///" + working_dir + "/stationlist.csv")

In [181]:
type(stationlist)

pyspark.sql.dataframe.DataFrame

In [182]:
stationlist.count()

25306

In [183]:
stationlist.columns

['STN_NO', 'COUNTRY_ABBR']

In [184]:
weather.createOrReplaceTempView('weather')
countrylist.createOrReplaceTempView('countrylist')
stationlist.createOrReplaceTempView('stationlist')

# Step 1 - Setting Up the Data

In [185]:
# Quick visual check
weather.limit(25).toPandas()

Unnamed: 0,STN---,WBAN,YEARMODA,TEMP,DEWP,SLP,STP,VISIB,WDSP,MXSPD,GUST,MAX,MIN,PRCP,SNDP,FRSHTT,MEASUREMENT_DATE
0,10260,99999,20190101,26.1,21.2,1001.9,987.5,20.6,9.0,15.9,29.7,29.8,21.7*,0.02G,18.5,1000,2019-01-01
1,10260,99999,20190102,24.9,22.1,1020.1,1005.5,5.4,5.6,13.6,22.1,27.1*,20.7,0.48G,22.8,1000,2019-01-02
2,10260,99999,20190103,31.7,29.1,1008.9,994.7,13.6,11.6,21.4,49.5,37.4*,26.8*,0.25G,999.9,11000,2019-01-03
3,10260,99999,20190104,32.9,30.3,1011.4,997.1,15.8,4.9,7.8,10.9,36.1,31.8,0.52G,999.9,1000,2019-01-04
4,10260,99999,20190105,35.5,33.0,1015.7,1001.4,12.0,10.4,13.6,21.0,38.5*,32.7,0.02G,23.6,10000,2019-01-05
5,10260,99999,20190106,38.5,34.1,1008.2,994.2,12.8,10.0,17.5,28.9,41.4,33.8*,0.12G,23.2,10000,2019-01-06
6,10260,99999,20190107,32.1,29.8,996.8,982.7,6.9,11.3,15.5,28.6,35.1*,30.4,0.00G,999.9,1000,2019-01-07
7,10260,99999,20190108,31.6,28.0,997.4,983.3,22.9,5.9,11.7,19.0,34.3,28.0*,0.53G,0.4,11000,2019-01-08
8,10260,99999,20190109,29.9,27.7,1011.6,997.3,29.8,7.6,15.2,26.6,32.4,26.1,0.20G,23.6,1000,2019-01-09
9,10260,99999,20190110,33.1,30.6,979.1,965.3,5.3,17.8,24.9,41.8,41.4,28.8*,0.00G,999.9,11000,2019-01-10


In [186]:
# I chose to use the SparkSQL API to _optimize for readability_.
# I could have used the DataFrame API. I'll include that code if time allows.

full_weather = sqlcontext.sql("""
    SELECT *
    FROM weather w 
    LEFT JOIN stationlist s ON(w.`STN---` = s.STN_NO)
    LEFT JOIN countrylist c USING(COUNTRY_ABBR)
""")
full_weather.createOrReplaceTempView('full_weather')


In [187]:
full_weather.columns

['COUNTRY_ABBR',
 'STN---',
 'WBAN',
 'YEARMODA',
 'TEMP',
 'DEWP',
 'SLP',
 'STP',
 'VISIB',
 'WDSP',
 'MXSPD',
 'GUST',
 'MAX',
 'MIN',
 'PRCP',
 'SNDP',
 'FRSHTT',
 'MEASUREMENT_DATE',
 'STN_NO',
 'COUNTRY_FULL']

In [188]:
# Another quick visual check
full_weather.limit(5).toPandas().T

Unnamed: 0,0,1,2,3,4
COUNTRY_ABBR,NO,NO,NO,NO,NO
STN---,10260,10260,10260,10260,10260
WBAN,99999,99999,99999,99999,99999
YEARMODA,20190101,20190102,20190103,20190104,20190105
TEMP,26.1,24.9,31.7,32.9,35.5
DEWP,21.2,22.1,29.1,30.3,33
SLP,1001.9,1020.1,1008.9,1011.4,1015.7
STP,987.5,1005.5,994.7,997.1,1001.4
VISIB,20.6,5.4,13.6,15.8,12
WDSP,9,5.6,11.6,4.9,10.4


# Step 2 - Questions

## Question 1 - Which country had the hottest average mean temperature over the year?

In [189]:
# First I inspect the data and do a few "sanity checks"

In [190]:
# Now I am using Spark DataFrame API, because I think it is more readable.

full_weather.select('COUNTRY_FULL').distinct().count()

233

In [191]:

countrylist.select('COUNTRY_FULL').distinct().count()

287

In [192]:
# Not all countries are in the data, OK.

In [193]:
weather.select('STN---').distinct().count()

12144

In [194]:
full_weather.select('STN---').distinct().count()

12144

In [195]:
# No stations lost during join, good.

In [196]:
# Mixing DataFrame and SQL APIs. functions imported as F to make sure that it does not get confused with native Python functions, such as max.
full_weather\
    .withColumn('no_country_code', F.expr('ISNULL(COUNTRY_FULL)'))\
    .where('COUNTRY_FULL is false')\
    .count()

0

In [197]:
# No missing countries, good

In [198]:
full_weather.agg(F.min('YEARMODA'), F.max('YEARMODA')).show()

+-------------+-------------+
|min(YEARMODA)|max(YEARMODA)|
+-------------+-------------+
|     20190101|     20200101|
+-------------+-------------+



In [199]:
# One year + 1 day, OK.

In [200]:
full_weather.where(F.expr('ISNULL(COUNTRY_FULL)')).select('COUNTRY_ABBR').distinct().show()

+------------+
|COUNTRY_ABBR|
+------------+
|          UC|
|          RI|
|          OD|
|          AE|
|          KV|
|          MJ|
|          NN|
+------------+



In [201]:
# UC is probably unkknown country, but what are the others?
# I'll ignore these given time constraints.

In [202]:
full_weather.where(~F.expr('ISNULL(COUNTRY_FULL)')).count(), full_weather.where(~F.expr('ISNULL(COUNTRY_FULL)')).select(['STN---', 'YEARMODA']).distinct().count()

(4143167, 4065962)

In [203]:
# My granularity is probably Station x Date, with some dirty data.

In [204]:
%%time
# Using only the DataFrame API in this case to _optimize for readbility_.
# Also getting a sense of how much time is required.

full_weather\
    .where(full_weather['TEMP'] != 9999.9)\
    .select('TEMP')\
    .describe()\
    .show()

+-------+------------------+
|summary|              TEMP|
+-------+------------------+
|  count|           4161334|
|   mean|55.264967820415656|
| stddev| 23.27092727176463|
|    min|            -114.7|
|    max|             110.0|
+-------+------------------+

Wall time: 2.37 s


In [205]:
# Figures above are somewhat believable, I guess no other data missingness?
# WARNING: I make a mental note that perhaps data got cut off above 100 - a potential data issue
# Average temperatures should go above 100F in some countries.
# No immediate solution, so I keep on.

In [206]:
full_weather.columns

['COUNTRY_ABBR',
 'STN---',
 'WBAN',
 'YEARMODA',
 'TEMP',
 'DEWP',
 'SLP',
 'STP',
 'VISIB',
 'WDSP',
 'MXSPD',
 'GUST',
 'MAX',
 'MIN',
 'PRCP',
 'SNDP',
 'FRSHTT',
 'MEASUREMENT_DATE',
 'STN_NO',
 'COUNTRY_FULL']

## Answer to Question 1

In [207]:
# hottest average mean temperature:
# I interpret this as: average over all of the days in a year for each country. That the "average mean".
# Then choose the country with the highest "average mean temperature".

# I could have removed the duplicates, but there is very few of them, so I chose to keep them in until I understand why they exist.

sqlcontext.sql("""
WITH COUNTRY_AVERAGES AS (
    SELECT 
        COUNTRY_FULL
        , AVG(TEMP) AS COUNTRY_AVERAGE_MEAN_TEMP
    FROM 
        full_weather
    WHERE TRUE
        AND TEMP < 9999.9
        AND NOT ISNULL(COUNTRY_FULL)                  
    GROUP BY
        COUNTRY_FULL
)

SELECT 
    COUNTRY_FULL
    , COUNTRY_AVERAGE_MEAN_TEMP
FROM COUNTRY_AVERAGES
ORDER BY COUNTRY_AVERAGE_MEAN_TEMP DESC
LIMIT 1
""").show()

+------------+-------------------------+
|COUNTRY_FULL|COUNTRY_AVERAGE_MEAN_TEMP|
+------------+-------------------------+
|    DJIBOUTI|        90.06114457831325|
+------------+-------------------------+



In [208]:
# Djibouti - I believe this.

## Question 2 - Which country had the most consecutive days of tornadoes/funnel cloud formations?

In [214]:
tornadoes = full_weather\
    .select('COUNTRY_ABBR','COUNTRY_FULL','MEASUREMENT_DATE', 'FRSHTT')\
    .where(F.expr('NOT ISNULL(COUNTRY_FULL)'))\
    .withColumn('T', F.substring('FRSHTT', -1, 1) == 1)
tornadoes.createOrReplaceTempView('tornadoes')

In [215]:
tornadoes.where('T').count()

309

In [216]:
tornadoes.where('T').sample(.1).show()

+------------+-------------------+----------------+------+----+
|COUNTRY_ABBR|       COUNTRY_FULL|MEASUREMENT_DATE|FRSHTT|   T|
+------------+-------------------+----------------+------+----+
|          US|      UNITED STATES|      2019-06-30| 10011|true|
|          TH|           THAILAND|      2019-12-18|     1|true|
|          SC|ST. KITTS AND NEVIS|      2019-06-07| 10001|true|
|          BF|        BAHAMAS THE|      2019-10-07| 10011|true|
|          NI|            NIGERIA|      2019-10-04|    11|true|
|          CO|           COLOMBIA|      2019-12-05|110011|true|
|          IT|              ITALY|      2019-05-12| 10111|true|
|          IT|              ITALY|      2019-07-27| 10011|true|
|          IT|              ITALY|      2019-11-20|     1|true|
|          IT|              ITALY|      2019-07-06|     1|true|
|          TU|             TURKEY|      2019-02-01|110001|true|
|          SP|              SPAIN|      2019-04-24| 10001|true|
|          IN|              INDIA|      

## Answer to Question 2

In [232]:
sqlcontext.sql("""
WITH CONSECUTIVE_INTERVALS AS(
    SELECT COUNTRY_ABBR, COUNTRY_FULL, MEASUREMENT_DATE,
        DATE_ADD(MEASUREMENT_DATE,-ROW_NUMBER() OVER(PARTITION BY COUNTRY_FULL ORDER BY MEASUREMENT_DATE ASC)) as START_DATE
    FROM tornadoes
    WHERE T AND NOT ISNULL(COUNTRY_FULL)
    ORDER BY COUNTRY_FULL, MEASUREMENT_DATE
),
INTERVAL_LENGTHS AS(
    SELECT COUNTRY_FULL, DATEDIFF(MAX(MEASUREMENT_DATE), MIN(START_DATE)) AS LENGTH
    FROM CONSECUTIVE_INTERVALS
    GROUP BY COUNTRY_FULL
)
SELECT COUNTRY_FULL
FROM INTERVAL_LENGTHS
ORDER BY LENGTH DESC
LIMIT 1
""").show()

+--------------+
|  COUNTRY_FULL|
+--------------+
|CAYMAN ISLANDS|
+--------------+



## Question 3 - Which country had the second highest average mean wind speed over the year?

In [235]:
sqlcontext.sql("""
WITH COUNTRY_AVERAGES AS (
    SELECT 
        COUNTRY_FULL
        , AVG(WDSP) AS COUNTRY_AVERAGE_MEAN_WIND_SPEED
    FROM 
        full_weather
    WHERE TRUE
        AND WDSP < 999.9
        AND NOT ISNULL(COUNTRY_FULL)                  
    GROUP BY
        COUNTRY_FULL
)

SELECT 
    COUNTRY_FULL
    , COUNTRY_AVERAGE_MEAN_WIND_SPEED
FROM COUNTRY_AVERAGES
ORDER BY COUNTRY_AVERAGE_MEAN_WIND_SPEED DESC
LIMIT 2
""").show()

+--------------------+-------------------------------+
|        COUNTRY_FULL|COUNTRY_AVERAGE_MEAN_WIND_SPEED|
+--------------------+-------------------------------+
|FALKLAND ISLANDS ...|             17.877833001988083|
|               ARUBA|             15.975683060109283|
+--------------------+-------------------------------+



In [None]:
# The answer is Aruba