In [1]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
# Create a Spark Session
spark = SparkSession \
        .builder \
        .appName('whether data exploration') \
        .getOrCreate()
print('Session created')

In [3]:
# Display all the files in the folder
files = dbutils.fs.ls("/FileStore/tables/new/")
display(files)

path,name,size
dbfs:/FileStore/tables/new/countrylist.csv,countrylist.csv,4373
dbfs:/FileStore/tables/new/part_00000_890686c0_c142_4c69_a744_dfdc9eca7df4_c000_csv.gz,part_00000_890686c0_c142_4c69_a744_dfdc9eca7df4_c000_csv.gz,19893364
dbfs:/FileStore/tables/new/part_00001_890686c0_c142_4c69_a744_dfdc9eca7df4_c000_csv.gz,part_00001_890686c0_c142_4c69_a744_dfdc9eca7df4_c000_csv.gz,18748869
dbfs:/FileStore/tables/new/part_00002_890686c0_c142_4c69_a744_dfdc9eca7df4_c000_csv.gz,part_00002_890686c0_c142_4c69_a744_dfdc9eca7df4_c000_csv.gz,17572400
dbfs:/FileStore/tables/new/part_00003_890686c0_c142_4c69_a744_dfdc9eca7df4_c000_csv.gz,part_00003_890686c0_c142_4c69_a744_dfdc9eca7df4_c000_csv.gz,15481340
dbfs:/FileStore/tables/new/part_00004_890686c0_c142_4c69_a744_dfdc9eca7df4_c000_csv.gz,part_00004_890686c0_c142_4c69_a744_dfdc9eca7df4_c000_csv.gz,9625851
dbfs:/FileStore/tables/new/stationlist.csv,stationlist.csv,253080


In [4]:
# File location
file_location = "/FileStore/tables/new/*.gz"

# Define column type
Schema = StructType([
  StructField("STN---", IntegerType(), True),
  StructField("WBAN", IntegerType(), True),
  StructField("YEARMODA", IntegerType(), True),
  StructField("TEMP", DoubleType(), True),
  StructField("DEWP", DoubleType(), True),
  StructField("SLP", DoubleType(), True),
  StructField("STP", DoubleType(), True),
  StructField("VISIB", DoubleType(), True),
  StructField("WDSP", DoubleType(), True),
  StructField("MXSPD", DoubleType(), True),
  StructField("GUST", DoubleType(), True),
  StructField("MAX", DoubleType(), True),
  StructField("MIN", DoubleType(), True),
  StructField("PRCP", DoubleType(), True),
  StructField("SNDP", DoubleType(), True),
  StructField("FRSHTT", StringType(), True)
])

# Load data
df = (
  spark.read
  .option("header", "true")
  .option("delimiter", ",")
  .schema(Schema) # Use the specified schema
  .csv(file_location)
)

# Display data
df.show(5)

In [5]:
# Impute null with exact missing values
df = df.fillna({'TEMP':'9999.9'})
df = df.fillna({'DEWP':'9999.9'})
df = df.fillna({'SLP':'9999.9'})
df = df.fillna({'STP':'9999.9'})
df = df.fillna({'VISIB':'999.9'})
df = df.fillna({'WDSP':'999.9'})
df = df.fillna({'MXSPD':'999.9'})
df = df.fillna({'GUST':'999.9'})
df = df.fillna({'MAX':'9999.9'})
df = df.fillna({'MIN':'9999.9'})
df = df.fillna({'PRCP':'99.99'})
df = df.fillna({'SNDP':'999.9'})
# Display data
df.show(5)

In [6]:
# Load country list data

# File location
file_location = "/FileStore/tables/new/countrylist.csv"

# Load data
df_country = (
  spark.read
  .option("header", "true")
  .option("delimiter", ",")
  .option("inferSchema", "true")  # Automatically infer data types
  .csv(file_location)
)

# Display data
df_country.show(5)

In [7]:
# Load station list data
# File location
file_location = "/FileStore/tables/new/stationlist.csv"

# Load data
df_station = (
  spark.read
  .option("header", "true")
  .option("delimiter", ",")
  .option("inferSchema", "true")  # Automatically infer data types
  .csv(file_location)
)

# Display data
df_station.show(5)

In [8]:
# Join station data with full country name
station_country = df_station.join(df_country, on=['COUNTRY_ABBR'], how='left')
station_country = station_country.withColumn("STN_NO", station_country["STN_NO"].cast(IntegerType()))
station_country.show(5)

In [9]:
# Join weather data with full country names by station number 
df_joined = df.join(station_country, df['STN---'] == station_country['STN_NO'], how='left')
df_joined.show(5)

In [10]:
# Get answer to the questions

avg_temp = (
  df_joined
  .filter('TEMP != 9999.9')
  .groupBy(col('COUNTRY_FULL'))
  .agg(avg('TEMP').alias('AVG_TEMP'))
  .sort(col('AVG_TEMP').desc())
)

# Question 1
print('Country had the hottest average mean temperature over the year:')
print(avg_temp.head(1))

# Question 2
print('Country had the coldest average mean temperature over the year:')
print(avg_temp.tail(1))

# Question 3
print('Country had the second highest average mean wind speed over the year:')
print(
  df_joined
  .filter('WDSP != 999.9')
  .groupBy(col('COUNTRY_FULL'))
  .agg(avg('WDSP').alias('AVG_WDSP'))
  .sort(col('AVG_WDSP').desc())
  .collect()[1]
)

# Question 4
print('Country had the most consecutive days of tornadoes/funnel cloud formations:')
df_joined = df_joined.withColumn("lastchar", df_joined.FRSHTT.substr(-1,1))
print(
  df_joined
  .filter("lastchar = '1'")
  .groupBy(col('COUNTRY_FULL'))
  .agg(sum('lastchar').alias('SUM_TOR'))
  .head(1)
)