In [2]:
import findspark

In [3]:
findspark.init()

In [4]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Row, Window
from pyspark.sql import DataFrame

from datetime import date
import tarfile
from io import BytesIO

In [5]:
import pandas as pd
import math
import numpy as np
import os
from pprint import pprint

In [6]:
# create Spark context with Spark configuration
conf = SparkConf().setAppName("Paytm Solution - Python").set(
    "spark.hadoop.yarn.resourcemanager.address", "192.168.0.104:8032")
sc = SparkContext(conf=conf)

# sc.stop()

spark = SparkSession.builder.master("local").appName("Paytm Soln").config(
    "spark.some.config.option", "some-value").getOrCreate()

### Step 1 - Setting Up the Data

1. Load the global weather data into your big data technology of choice.
2. Join the stationlist.csv with the countrylist.csv to get the full country name for each station number.
3. Join the global weather data with the full country names by station number.

We can now begin to answer the weather questions! 

In [7]:
df_wd = spark.read.option("header",True).csv("data/2019/*.csv.gz")

print('\n1.1 a) Display Schema of global weather dataset table\n')
df_wd.printSchema()

#Display the contents of global weather data
print('\n1.1 b) Display contents of global weather dataset table\n')
df_wd.show(5)


1.1 a) Display Schema of global weather dataset table

root
 |-- STN---: string (nullable = true)
 |-- WBAN: string (nullable = true)
 |-- YEARMODA: string (nullable = true)
 |-- TEMP: string (nullable = true)
 |-- DEWP: string (nullable = true)
 |-- SLP: string (nullable = true)
 |-- STP: string (nullable = true)
 |-- VISIB: string (nullable = true)
 |-- WDSP: string (nullable = true)
 |-- MXSPD: string (nullable = true)
 |-- GUST: string (nullable = true)
 |-- MAX: string (nullable = true)
 |-- MIN: string (nullable = true)
 |-- PRCP: string (nullable = true)
 |-- SNDP: string (nullable = true)
 |-- FRSHTT: string (nullable = true)


1.1 b) Display contents of global weather dataset table

+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+
|STN---| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|
+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+---

In [8]:
df_sl = spark.read.option("header",True).csv("stationlist.csv")
print('\nDisplay Schema of stationlist.csv dataset table\n')
df_sl.printSchema()

#Display the contents of station data
print('\nDisplay contents of stationlist.csv dataset table\n')
df_sl.show(5)    


Display Schema of stationlist.csv dataset table

root
 |-- STN_NO: string (nullable = true)
 |-- COUNTRY_ABBR: string (nullable = true)


Display contents of stationlist.csv dataset table

+------+------------+
|STN_NO|COUNTRY_ABBR|
+------+------------+
|012240|          NO|
|020690|          SW|
|020870|          SW|
|021190|          SW|
|032690|          UK|
+------+------------+
only showing top 5 rows



In [9]:
df_cl = spark.read.option("header",True).csv("countrylist.csv")
print('\nDisplay Schema of countrylist.csv dataset table\n')
df_cl.printSchema()

#Display the contents of countrylist data
print('\nDisplay contents of countrylist.csv dataset table\n')
df_cl.show(5)    


Display Schema of countrylist.csv dataset table

root
 |-- COUNTRY_ABBR: string (nullable = true)
 |-- COUNTRY_FULL: string (nullable = true)


Display contents of countrylist.csv dataset table

+------------+-------------------+
|COUNTRY_ABBR|       COUNTRY_FULL|
+------------+-------------------+
|          AA|              ARUBA|
|          AC|ANTIGUA AND BARBUDA|
|          AF|        AFGHANISTAN|
|          AG|            ALGERIA|
|          AI|   ASCENSION ISLAND|
+------------+-------------------+
only showing top 5 rows



In [10]:
df_clsl = df_sl.join(df_cl, df_sl.COUNTRY_ABBR == df_cl.COUNTRY_ABBR, how = 'Left').drop(df_cl.COUNTRY_ABBR)

print('\n1.2) Display contents of JOIN of stationlist.csv and countrylist.csv dataset tables\n')
df_clsl.show(10)


1.2) Display contents of JOIN of stationlist.csv and countrylist.csv dataset tables

+------+------------+--------------+
|STN_NO|COUNTRY_ABBR|  COUNTRY_FULL|
+------+------------+--------------+
|012240|          NO|        NORWAY|
|020690|          SW|        SWEDEN|
|020870|          SW|        SWEDEN|
|021190|          SW|        SWEDEN|
|032690|          UK|UNITED KINGDOM|
|033450|          UK|UNITED KINGDOM|
|039290|          UK|UNITED KINGDOM|
|039790|          EI|       IRELAND|
|040480|          IC|       ICELAND|
|041300|          IC|       ICELAND|
+------+------------+--------------+
only showing top 10 rows



In [11]:
#3. Join the global weather data with the full country names by station number.
df_wdclsl = df_wd.join(df_clsl, df_wd['STN---'] == df_clsl.STN_NO, how = 'Left').drop(
    df_clsl['STN_NO']).drop(df_clsl['COUNTRY_ABBR'])

print('\n1.3) Display contents of global weather data and full country names by station number\n')
df_wdclsl.show(5)


1.3) Display contents of global weather data and full country names by station number

+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+------------+
|STN---| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD| GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|COUNTRY_FULL|
+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+------------+
|010875|99999|20190101|41.1|30.1|9999.9|9999.9|  5.9|46.7| 59.1| 74.0|44.6*|37.4*|99.99|999.9|011010|      NORWAY|
|010875|99999|20190102|40.5|29.0|9999.9|9999.9|  6.2|20.5| 32.1| 44.1|41.0*|37.4*|99.99|999.9|010000|      NORWAY|
|010875|99999|20190103|43.0|36.6|9999.9|9999.9|  6.1|13.5| 21.0|999.9|44.6*|41.0*|0.00I|999.9|000000|      NORWAY|
|010875|99999|20190104|46.7|44.4|9999.9|9999.9|  5.8|27.4| 33.0| 40.0|48.2*|42.8*|99.99|999.9|010000|      NORWAY|
|010875|99999|20190105|46.5|44.1|9999.9|9999.9|  6.1|18.3| 25.1|999.9|48.2*|44.6*|99.99|999.9|010000|      

### Step 2 - Questions
Using the global weather data, answer the following:

1. Which country had the hottest average mean temperature over the year?
2. Which country had the most consecutive days of tornadoes/funnel cloud formations?
3. Which country had the second highest average mean wind speed over the year?

In [12]:
df_2p1 = df_wdclsl.groupby('COUNTRY_FULL').agg(F.avg('TEMP').alias('AVG_TEMP')).sort(F.col("AVG_TEMP").desc())

print('\n 2.1) The country that had the hottest average mean temperature over the year\n')
df_2p1.show(1)


 2.1) The country that had the hottest average mean temperature over the year

+------------+-----------------+
|COUNTRY_FULL|         AVG_TEMP|
+------------+-----------------+
|    DJIBOUTI|90.06114457831325|
+------------+-----------------+
only showing top 1 row



In [13]:
#2. Which country had the most consecutive days of tornadoes/funnel cloud formations?

df_2p2 = df_wdclsl.filter(F.col('FRSHTT').substr(6, 1) == '1')

df_2p2.show(5)

+------+-----+--------+----+----+------+------+-----+-----+-----+-----+-----+-----+-----+-----+------+-------------+
|STN---| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB| WDSP|MXSPD| GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT| COUNTRY_FULL|
+------+-----+--------+----+----+------+------+-----+-----+-----+-----+-----+-----+-----+-----+------+-------------+
|726797|24132|20190616|62.8|49.6|1013.4| 863.4| 10.0|  4.3| 13.0| 29.9| 79.0| 45.0|0.02G|999.9|010011|UNITED STATES|
|655280|99999|20191027|80.6|72.8|1009.3| 963.2|  5.0|999.9|999.9|999.9|92.1*| 69.1|0.00I|999.9|000011|COTE D'IVOIRE|
|123850|99999|20190521|61.1|56.1|1004.7| 986.7| 15.9|  6.0| 11.7|999.9| 70.9|53.2*|0.80G|999.9|010011|       POLAND|
|718432|99999|20190515|32.0|28.2|1007.0|9999.9|  7.4| 13.2| 15.0| 15.9|33.8*|30.2*|99.99|999.9|010001|       CANADA|
|718432|99999|20190830|48.7|47.5| 989.9|9999.9|  1.9| 15.8| 18.1| 25.1|50.0*|46.4*|99.99|999.9|010001|       CANADA|
+------+-----+--------+----+----+------+------+-----+-----+-----

In [14]:
df_2p2 = df_2p2.withColumn('DATENUM', F.col('YEARMODA').cast(DoubleType()))

w1 = Window.partitionBy(df_2p2.COUNTRY_FULL).orderBy(df_2p2.DATENUM)

In [15]:
df = df_2p2.withColumn("row_no", F.row_number().over(w1))\
           .withColumn("diff", F.when(F.isnull(F.col('DATENUM') - F.col('row_no')), 0)
                                .otherwise(F.col('DATENUM') - F.col('row_no'))
                      )

print ("\n All consecutive dates have same value in column 'diff'\n")
df.show(5)


 All consecutive dates have same value in column 'diff'

+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+------------+-----------+------+-----------+
|STN---| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD| GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|COUNTRY_FULL|    DATENUM|row_no|       diff|
+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+------------+-----------+------+-----------+
|418830|99999|20190822|86.4|79.9|1001.6| 999.6|  2.9| 1.2|  1.9|999.9| 97.5|82.4*|0.04G|999.9|010011|  BANGLADESH|2.0190822E7|     1|2.0190821E7|
|477040|99999|20190117|41.1|31.7|9999.9|9999.9|  6.0|13.3| 25.1| 35.9|44.6*|35.6*|99.99|999.9|011111|       JAPAN|2.0190117E7|     1|2.0190116E7|
|475730|43318|20190124|35.0|27.6|9999.9|9999.9|  4.9|18.4| 31.1| 41.0|37.4*|32.0*|99.99|999.9|001001|       JAPAN|2.0190124E7|     2|2.0190122E7|
|479276|99999|20190302|75.8|69.9|9999.9|9999.9|  5.9|10.0| 15.9|99

In [16]:
df1 = df.groupby('COUNTRY_FULL', 'diff').agg(F.count('*').alias('num_consecutive'))
print ("\n GroupBy and Count the values in column 'diff'\n")  
df1.show(5)


 GroupBy and Count the values in column 'diff'

+------------+-----------+---------------+
|COUNTRY_FULL|       diff|num_consecutive|
+------------+-----------+---------------+
|  BANGLADESH|2.0190821E7|              1|
|       JAPAN|2.0190116E7|              1|
|       JAPAN|2.0190122E7|              1|
|       JAPAN|2.0190299E7|              1|
|       JAPAN|2.0190398E7|              1|
+------------+-----------+---------------+
only showing top 5 rows



In [17]:
df2 = df1.groupby('COUNTRY_FULL').agg(F.max('num_consecutive').alias('max_num_consecutive'))\
         .sort(F.col("max_num_consecutive").desc())
print ("\n GroupBy, find maximum value of column 'num_consecutive' and display sorted result\n")
print ("\n 2.2) The country that had the most consecutive days of tornadoes/funnel cloud formations is: ")

df2.show(1)


 GroupBy, find maximum value of column 'num_consecutive' and display sorted result


 2.2) The country that had the most consecutive days of tornadoes/funnel cloud formations is: 
+------------+-------------------+
|COUNTRY_FULL|max_num_consecutive|
+------------+-------------------+
|       ITALY|                  3|
+------------+-------------------+
only showing top 1 row



In [18]:
#3. Which country had the second highest average mean wind speed over the year?
df_2p3 = df_wdclsl.withColumn('WDSPNUM', F.col('WDSP').cast(DoubleType()))\
                  .groupby('COUNTRY_FULL').agg(F.avg('WDSP').alias('AVG_WDSP'))\
                  .withColumn('rn', F.row_number().over(
                                                Window.orderBy(F.col("AVG_WDSP").desc())
                                                )
                             )\
                  .where(F.col('rn') == 2).drop('rn')                             

print('\n 2.3) The country that had the second highest average mean wind speed over the year\n')
df_2p3.select(*['COUNTRY_FULL'], *[F.round(c, 2).alias(c) for c in df_2p3.columns[1:] ]).show()


 2.3) The country that had the second highest average mean wind speed over the year

+------------+--------+
|COUNTRY_FULL|AVG_WDSP|
+------------+--------+
|     ARMENIA|  457.37|
+------------+--------+

