## Overview

This notebook is for the PayTM coding challenge - MLE position

In [0]:
from pyspark.sql.functions import isnan, when, count, col, desc, avg, rank
from pyspark.sql.functions import to_date, row_number
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import Imputer

In [0]:
# File location and type
file_location = "/FileStore/tables/data/2019/*.gz"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

STN---,WBAN,YEARMODA,TEMP,DEWP,SLP,STP,VISIB,WDSP,MXSPD,GUST,MAX,MIN,PRCP,SNDP,FRSHTT
10260,99999,20190101,26.1,21.2,1001.9,987.5,20.6,9.0,15.9,29.7,29.8,21.7*,0.02G,18.5,1000
10260,99999,20190102,24.9,22.1,1020.1,1005.5,5.4,5.6,13.6,22.1,27.1*,20.7,0.48G,22.8,1000
10260,99999,20190103,31.7,29.1,1008.9,994.7,13.6,11.6,21.4,49.5,37.4*,26.8*,0.25G,999.9,11000
10260,99999,20190104,32.9,30.3,1011.4,997.1,15.8,4.9,7.8,10.9,36.1,31.8,0.52G,999.9,1000
10260,99999,20190105,35.5,33.0,1015.7,1001.4,12.0,10.4,13.6,21.0,38.5*,32.7,0.02G,23.6,10000
10260,99999,20190106,38.5,34.1,1008.2,994.2,12.8,10.0,17.5,28.9,41.4,33.8*,0.12G,23.2,10000
10260,99999,20190107,32.1,29.8,996.8,982.7,6.9,11.3,15.5,28.6,35.1*,30.4,0.00G,999.9,1000
10260,99999,20190108,31.6,28.0,997.4,983.3,22.9,5.9,11.7,19.0,34.3,28.0*,0.53G,0.4,11000
10260,99999,20190109,29.9,27.7,1011.6,997.3,29.8,7.6,15.2,26.6,32.4,26.1,0.20G,23.6,1000
10260,99999,20190110,33.1,30.6,979.1,965.3,5.3,17.8,24.9,41.8,41.4,28.8*,0.00G,999.9,11000


In [0]:
df.printSchema()

In [0]:
df = df.withColumnRenamed("STN---","STN_NO")

In [0]:
df.show(4)

In [0]:
# Create a view or table

temp_table_name = "globalweathertable"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `globalweathertable`

STN---,WBAN,YEARMODA,TEMP,DEWP,SLP,STP,VISIB,WDSP,MXSPD,GUST,MAX,MIN,PRCP,SNDP,FRSHTT
10260,99999,20190101,26.1,21.2,1001.9,987.5,20.6,9.0,15.9,29.7,29.8,21.7*,0.02G,18.5,1000
10260,99999,20190102,24.9,22.1,1020.1,1005.5,5.4,5.6,13.6,22.1,27.1*,20.7,0.48G,22.8,1000
10260,99999,20190103,31.7,29.1,1008.9,994.7,13.6,11.6,21.4,49.5,37.4*,26.8*,0.25G,999.9,11000
10260,99999,20190104,32.9,30.3,1011.4,997.1,15.8,4.9,7.8,10.9,36.1,31.8,0.52G,999.9,1000
10260,99999,20190105,35.5,33.0,1015.7,1001.4,12.0,10.4,13.6,21.0,38.5*,32.7,0.02G,23.6,10000
10260,99999,20190106,38.5,34.1,1008.2,994.2,12.8,10.0,17.5,28.9,41.4,33.8*,0.12G,23.2,10000
10260,99999,20190107,32.1,29.8,996.8,982.7,6.9,11.3,15.5,28.6,35.1*,30.4,0.00G,999.9,1000
10260,99999,20190108,31.6,28.0,997.4,983.3,22.9,5.9,11.7,19.0,34.3,28.0*,0.53G,0.4,11000
10260,99999,20190109,29.9,27.7,1011.6,997.3,29.8,7.6,15.2,26.6,32.4,26.1,0.20G,23.6,1000
10260,99999,20190110,33.1,30.6,979.1,965.3,5.3,17.8,24.9,41.8,41.4,28.8*,0.00G,999.9,11000


In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

#permanent_table_name = "global_weather_table"

#df.write.format("parquet").saveAsTable(permanent_table_name)

In [0]:
# File location and type
file_location = "/FileStore/tables/countrylist.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
country_list = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(country_list)

COUNTRY_ABBR,COUNTRY_FULL
AA,ARUBA
AC,ANTIGUA AND BARBUDA
AF,AFGHANISTAN
AG,ALGERIA
AI,ASCENSION ISLAND
AJ,AZERBAIJAN
AL,ALBANIA
AM,ARMENIA
AN,ANDORRA
AO,ANGOLA


In [0]:
# File location and type
file_location = "/FileStore/tables/stationlist.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
station_list = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(station_list)



STN_NO,COUNTRY_ABBR
012240,NO
020690,SW
020870,SW
021190,SW
032690,UK
033450,UK
039290,UK
039790,EI
040480,IC
041300,IC


In [0]:
country_by_station = station_list.join(country_list, how = 'left', on = ['COUNTRY_ABBR'])
country_by_station.show(5)

In [0]:
#Checking for null values post left-join
country_by_station.select([count(when(isnan(c), c)).alias(c) for c in country_by_station.columns]).show()

In [0]:
global_weather_table = df.join(country_by_station, how = 'left', on = 'STN_NO')
global_weather_table.show()

In [0]:
#Checking for null values in gloabl weather table
global_weather_table.select([count(when(isnan(c), c)).alias(c) for c in global_weather_table.columns]).show()

In [0]:
input_col = []
imputer = Imputer(
    inputCols=df.columns, 
    outputCols=["{}_imputed".format(c) for c in df.columns]
)
imputer.fit(df).transform(df)

imr = Imputer(missing_values='NaN', strategy='median', axis=0)
imr = imr.fit(data[['age']])

In [0]:
global_weather_table = global_weather_table.withColumn("TEMP", \
              when(global_weather_table["TEMP"] == 9999.9, None).otherwise(global_weather_table["TEMP"]))

In [0]:
#could not finish the imputation
imputer = Imputer(
    inputCols=global_weather_table['TEMP'], 
    outputCols=["{}_imputed".format(c) for c in df['TEMP']]
    ).setStrategy("median")

# Add imputation cols to df
df = imputer.fit(df).transform(df)

In [0]:
# Question 1
q1 = global_weather_table.select('COUNTRY_FULL', 'TEMP').groupby('COUNTRY_FULL').agg(avg(col("TEMP")).alias('AVG_TEMP')).orderBy(desc('AVG_TEMP')).limit(1)
q1.show()

In [0]:
q3 = global_weather_table.select('COUNTRY_FULL', 'WDSP').groupby('COUNTRY_FULL').agg(avg(col("WDSP")).alias('AVG_WIND_SPEED')).orderBy(desc('AVG_WIND_SPEED')).limit(1)
q3.show()

In [0]:
# Question 3
avg_df = global_weather_table.select('COUNTRY_FULL', 'WDSP').groupby('COUNTRY_FULL').agg(avg(col("WDSP")).alias('AVG_WIND_SPEED'))
windowSpec  = Window.partitionBy("COUNTRY_FULL").orderBy('AVG_WIND_SPEED')

ranked_df = avg_df.withColumn("rank", rank().over(windowSpec).alias('rank')) \
    .filter(col("rank") == 2).show()
#q3 = ranked_df.filter(col("rank") == 2)