In [1]:
import pyspark

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
         .master("local[4]") \
         .config("spark.executor.memory", "2g") \
         .config("spark.driver.memory", "8g") \
         .config("spark.memory.offHeap.enabled",True) \
         .config("spark.memory.offHeap.size","2g") \
         .config("spark.python.worker.memory","1g") \
         .appName('Paytm').getOrCreate()

In [132]:
from pyspark.sql.functions import *
import pandas as pd
import seaborn as sns
from pyspark.sql.functions import col,sum
import numpy as np
from pyspark.sql.functions import when
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.types import DoubleType
from pyspark.sql import Window

In [52]:
####Importing data

countrylist = spark.read.csv('/home/jovyan/work/paytmteam-de-weather-challenge-b01d5ebbf02d/countrylist.csv', 
                             header=True, sep=',', inferSchema=True)


stationlist = spark.read.csv('/home/jovyan/work/paytmteam-de-weather-challenge-b01d5ebbf02d/stationlist.csv', 
                             header=True, sep=',', inferSchema=True)

global_weather = spark.read.csv('/home/jovyan/work/paytmteam-de-weather-challenge-b01d5ebbf02d/data/2019/*.csv', 
                             header=True, sep=',', inferSchema=True)

In [18]:
global_weather.columns

['STN---',
 'WBAN',
 'YEARMODA',
 'TEMP',
 'DEWP',
 'SLP',
 'STP',
 'VISIB',
 'WDSP',
 'MXSPD',
 'GUST',
 'MAX',
 'MIN',
 'PRCP',
 'SNDP',
 'FRSHTT']

In [53]:
global_weather = global_weather.withColumnRenamed("STN---", "STN_NO")

In [7]:
stationlist.head(5)

[Row(STN_NO='012240', COUNTRY_ABBR='NO'),
 Row(STN_NO='020690', COUNTRY_ABBR='SW'),
 Row(STN_NO='020870', COUNTRY_ABBR='SW'),
 Row(STN_NO='021190', COUNTRY_ABBR='SW'),
 Row(STN_NO='032690', COUNTRY_ABBR='UK')]

In [8]:
countrylist.head(5)

[Row(COUNTRY_ABBR='AA', COUNTRY_FULL='ARUBA'),
 Row(COUNTRY_ABBR='AC', COUNTRY_FULL='ANTIGUA AND BARBUDA'),
 Row(COUNTRY_ABBR='AF', COUNTRY_FULL='AFGHANISTAN'),
 Row(COUNTRY_ABBR='AG', COUNTRY_FULL='ALGERIA'),
 Row(COUNTRY_ABBR='AI', COUNTRY_FULL='ASCENSION ISLAND')]

In [78]:
station_df = stationlist.join(countrylist, on = ["COUNTRY_ABBR"], how= "left")
globalDF = global_weather.join(station_df, on = ["STN_NO"], how= "left")

In [55]:
globalDF.printSchema()

root
 |-- STN_NO: integer (nullable = true)
 |-- WBAN: integer (nullable = true)
 |-- YEARMODA: integer (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- SLP: double (nullable = true)
 |-- STP: double (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- MAX: string (nullable = true)
 |-- MIN: string (nullable = true)
 |-- PRCP: string (nullable = true)
 |-- SNDP: double (nullable = true)
 |-- FRSHTT: integer (nullable = true)
 |-- COUNTRY_ABBR: string (nullable = true)
 |-- COUNTRY_FULL: string (nullable = true)



In [81]:
globalDf = globalDF.withColumn("MAX", globalDF["MAX"].cast("double"))
globalDf = globalDF.withColumn("MIN", globalDF["MIN"].cast("double"))
globalDf = globalDF.withColumn("PRCP", globalDF["PRCP"].cast("double"))
globalDf = globalDF.withColumn("FRSHTT", globalDF["FRSHTT"].cast("string"))

In [57]:
globalDF = globalDF.withColumn("TEMP", when(globalDF["TEMP"] == 9999.9, lit(None)).otherwise(globalDF["TEMP"]))
globalDF = globalDF.withColumn("DEWP", when(globalDF["DEWP"] == 9999.9, lit(None)).otherwise(globalDF["DEWP"]))
globalDF = globalDF.withColumn("SLP", when(globalDF["SLP"] == 9999.9, lit(None)).otherwise(globalDF["SLP"]))
globalDF = globalDF.withColumn("STP", when(globalDF["STP"] == 9999.9, lit(None)).otherwise(globalDF["STP"]))
globalDF = globalDF.withColumn("VISIB", when(globalDF["VISIB"] == 999.9, lit(None)).otherwise(globalDF["VISIB"]))
globalDF = globalDF.withColumn("WDSP", when(globalDF["WDSP"] == 999.9, lit(None)).otherwise(globalDF["WDSP"]))
globalDF = globalDF.withColumn("MXSPD", when(globalDF["MXSPD"] == 999.9, lit(None)).otherwise(globalDF["MXSPD"]))
globalDF = globalDF.withColumn("GUST", when(globalDF["GUST"] == 999.9, lit(None)).otherwise(globalDF["GUST"]))
globalDF = globalDF.withColumn("MAX", when(globalDF["MAX"] == 9999.9, lit(None)).otherwise(globalDF["MAX"]))
globalDF = globalDF.withColumn("MIN", when(globalDF["MIN"] == 9999.9, lit(None)).otherwise(globalDF["MIN"]))
globalDF = globalDF.withColumn("PRCP", when(globalDF["PRCP"] == 99.9, lit(None)).otherwise(globalDF["PRCP"]))
globalDF = globalDF.withColumn("SNDP", when(globalDF["SNDP"] == 999.9, lit(None)).otherwise(globalDF["SNDP"]))


In [None]:
## Replacing the missing values with the mean of the respective column

In [58]:
# Columns with missing values
missing_columns = ['TEMP', 'DEWP', 'SLP', 'STP', 'VISIB', 'WDSP', 'MXSPD', 'GUST', 'MAX', 'MIN', 'PRCP', 'SNDP']

# Calculate the mean value
for c in missing_columns :
    col_mean = globalDF.agg({c: 'mean'}).collect()[0][0]
    # Replacing with the mean value for that column
    globalDF = globalDF.fillna(col_mean, subset=[c])

In [59]:
### Top 5 Countries with hottest mean average temprature over the year

In [69]:
globalDF.groupBy("COUNTRY_FULL").agg(avg('TEMP').alias("AVERAGE")).sort(desc("AVERAGE")).show(5)

+------------+-----------------+
|COUNTRY_FULL|          AVERAGE|
+------------+-----------------+
|    DJIBOUTI|90.06114457831325|
|        CHAD|87.36099706744865|
|       NIGER|85.06022291247945|
|       SUDAN|84.45494186046511|
| EL SALVADOR|84.44045944678854|
+------------+-----------------+
only showing top 5 rows



In [70]:
### Top 5 Countries with coldest mean average temprature over the year

globalDF.groupBy("COUNTRY_FULL").agg(avg('TEMP').alias("AVERAGE")).sort("AVERAGE").show(5)

+------------+-------------------+
|COUNTRY_FULL|            AVERAGE|
+------------+-------------------+
|  ANTARCTICA|-2.8328838863485393|
|    SVALBARD|           23.64208|
|   GREENLAND| 25.525825043423335|
|      RUSSIA| 31.378486130244845|
|    MONGOLIA|  34.50474591250553|
+------------+-------------------+
only showing top 5 rows



In [None]:
### Top 5 Countries with highest average mean wind speed over the year

In [73]:
globalDF.groupBy("COUNTRY_FULL").agg(avg('WDSP').alias("AVERAGE")).sort(desc("AVERAGE")).show(5, False)

+-----------------------------------+------------------+
|COUNTRY_FULL                       |AVERAGE           |
+-----------------------------------+------------------+
|FALKLAND ISLANDS (ISLAS MALVINAS)  |17.854922323310944|
|ARUBA                              |15.975683060109283|
|FAROE ISLANDS                      |15.269151674322673|
|BARBADOS                           |14.097540983606562|
|FRENCH SOUTHERN AND ANTARCTIC LANDS|14.09722890172177 |
+-----------------------------------+------------------+
only showing top 5 rows



In [None]:
### Top Country with the moset consecutive days of tornados/funnel cloud over the year

In [107]:
### Padding leading zero to the column 
globalDF = globalDF.withColumn("FRSHTT", lpad(globalDF["FRSHTT"], 6, '0'))

In [136]:
globalDF = globalDF.withColumn('FRSHTT_TORNADO', globalDF["FRSHTT"].substr(6, 1).cast("int"))

In [143]:
globalDF = globalDF.withColumn('cumsum', sum('FRSHTT_TORNADO').over(Window.partitionBy('COUNTRY_FULL').orderBy('YEARMODA')))

In [152]:
globalDF.select(["COUNTRY_FULL", "cumsum"]).distinct().sort(desc("cumsum")).first()

Row(COUNTRY_FULL='UNITED STATES', cumsum=34)