In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import matplotlib.pyplot as plt
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, StringType, DoubleType
%matplotlib inline

In [2]:
#create Spark session
spark = SparkSession.builder.appName('weather').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','8g')])



In [3]:
# shrink the dataset by selecting columns with less than 50% missing values
year = ['2018', '2019', '2020', '2021', '2022']

for y in year:
    print('Starting to process ' + y + ' weather data')
    df = spark.read \
        .option("quote", "\"")  \
        .option("escape", "\"") \
        .option("ignoreLeadingWhiteSpace",True) \
        .csv("gs://mscabigdata7fall2022/" + y + ".tar.gz",inferSchema=True, header=True )
    df = df.withColumnRenamed(df.columns[0], 'Station')
    df.select('Station', 'DATE', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'NAME', 'REPORT_TYPE', 'SOURCE',
                      'HourlyAltimeterSetting', 'HourlyDewPointTemperature', 'HourlyDryBulbTemperature', 
                       'HourlyRelativeHumidity', 'HourlyStationPressure', 'HourlyVisibility',
                       'HourlyWetBulbTemperature','HourlyWindDirection', 'HourlyWindSpeed',  'REM')\
    .write.csv('gs://airline_project/' + y + 'weather.csv', mode="overwrite", header=True)
    print('Finish processing ' + y + ' weather data')

                                                                                

In [8]:
weather2022 = spark.read \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace",True) \
    .csv("gs://airline_project/2022weather.csv",inferSchema=True, header=True )
weather2022.show(1)

                                                                                

+-----------+-------------------+----------+----------+---------+-------------+-----------+------+----------------------+-------------------------+------------------------+----------------------+---------------------+----------------+------------------------+-------------------+---------------+--------------------+
|    Station|               DATE|  LATITUDE| LONGITUDE|ELEVATION|         NAME|REPORT_TYPE|SOURCE|HourlyAltimeterSetting|HourlyDewPointTemperature|HourlyDryBulbTemperature|HourlyRelativeHumidity|HourlyStationPressure|HourlyVisibility|HourlyWetBulbTemperature|HourlyWindDirection|HourlyWindSpeed|                 REM|
+-----------+-------------------+----------+----------+---------+-------------+-----------+------+----------------------+-------------------------+------------------------+----------------------+---------------------+----------------+------------------------+-------------------+---------------+--------------------+
|38774099999|2022-01-01T02:00:00|38.4333333|57.41

In [9]:
# check missing value
amount_missing_2022 = weather2022.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in weather2022.columns])
amount_missing_2022.show()



+-------+----+--------------------+--------------------+--------------------+--------------------+-----------+------+----------------------+-------------------------+------------------------+----------------------+---------------------+-------------------+------------------------+-------------------+-------------------+------------------+
|Station|DATE|            LATITUDE|           LONGITUDE|           ELEVATION|                NAME|REPORT_TYPE|SOURCE|HourlyAltimeterSetting|HourlyDewPointTemperature|HourlyDryBulbTemperature|HourlyRelativeHumidity|HourlyStationPressure|   HourlyVisibility|HourlyWetBulbTemperature|HourlyWindDirection|    HourlyWindSpeed|               REM|
+-------+----+--------------------+--------------------+--------------------+--------------------+-----------+------+----------------------+-------------------------+------------------------+----------------------+---------------------+-------------------+------------------------+-------------------+-----------------

                                                                                

In [10]:
weather2021 = spark.read \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace",True) \
    .csv("gs://airline_project/2021weather.csv",inferSchema=True, header=True )
weather2021.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in weather2021.columns]).show()



+-------+----+--------------------+--------------------+--------------------+--------------------+-----------+------+----------------------+-------------------------+------------------------+----------------------+---------------------+-------------------+------------------------+-------------------+-------------------+-------------------+
|Station|DATE|            LATITUDE|           LONGITUDE|           ELEVATION|                NAME|REPORT_TYPE|SOURCE|HourlyAltimeterSetting|HourlyDewPointTemperature|HourlyDryBulbTemperature|HourlyRelativeHumidity|HourlyStationPressure|   HourlyVisibility|HourlyWetBulbTemperature|HourlyWindDirection|    HourlyWindSpeed|                REM|
+-------+----+--------------------+--------------------+--------------------+--------------------+-----------+------+----------------------+-------------------------+------------------------+----------------------+---------------------+-------------------+------------------------+-------------------+---------------

                                                                                

In [None]:
weather2020 = spark.read \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace",True) \
    .csv("gs://airline_project/2020weather.csv",inferSchema=True, header=True )
weather2020.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in weather2020.columns]).show()



+-------+----+--------------------+--------------------+--------------------+--------------------+-----------+------+----------------------+-------------------------+------------------------+----------------------+---------------------+-------------------+------------------------+-------------------+-------------------+------------------+
|Station|DATE|            LATITUDE|           LONGITUDE|           ELEVATION|                NAME|REPORT_TYPE|SOURCE|HourlyAltimeterSetting|HourlyDewPointTemperature|HourlyDryBulbTemperature|HourlyRelativeHumidity|HourlyStationPressure|   HourlyVisibility|HourlyWetBulbTemperature|HourlyWindDirection|    HourlyWindSpeed|               REM|
+-------+----+--------------------+--------------------+--------------------+--------------------+-----------+------+----------------------+-------------------------+------------------------+----------------------+---------------------+-------------------+------------------------+-------------------+-----------------

                                                                                

In [12]:
weather2019 = spark.read \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace",True) \
    .csv("gs://airline_project/2019weather.csv",inferSchema=True, header=True )
weather2019.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in weather2019.columns]).show()




+-------+----+--------------------+--------------------+--------------------+--------------------+-----------+------+----------------------+-------------------------+------------------------+----------------------+---------------------+------------------+------------------------+-------------------+-------------------+-------------------+
|Station|DATE|            LATITUDE|           LONGITUDE|           ELEVATION|                NAME|REPORT_TYPE|SOURCE|HourlyAltimeterSetting|HourlyDewPointTemperature|HourlyDryBulbTemperature|HourlyRelativeHumidity|HourlyStationPressure|  HourlyVisibility|HourlyWetBulbTemperature|HourlyWindDirection|    HourlyWindSpeed|                REM|
+-------+----+--------------------+--------------------+--------------------+--------------------+-----------+------+----------------------+-------------------------+------------------------+----------------------+---------------------+------------------+------------------------+-------------------+------------------

                                                                                

In [13]:
weather2018 = spark.read \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace",True) \
    .csv("gs://airline_project/2018weather.csv",inferSchema=True, header=True )
weather2018.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in weather2018.columns]).show()




+-------+----+-------------------+-------------------+-------------------+-------------------+-----------+------+----------------------+-------------------------+------------------------+----------------------+---------------------+------------------+------------------------+-------------------+------------------+-------------------+
|Station|DATE|           LATITUDE|          LONGITUDE|          ELEVATION|               NAME|REPORT_TYPE|SOURCE|HourlyAltimeterSetting|HourlyDewPointTemperature|HourlyDryBulbTemperature|HourlyRelativeHumidity|HourlyStationPressure|  HourlyVisibility|HourlyWetBulbTemperature|HourlyWindDirection|   HourlyWindSpeed|                REM|
+-------+----+-------------------+-------------------+-------------------+-------------------+-----------+------+----------------------+-------------------------+------------------------+----------------------+---------------------+------------------+------------------------+-------------------+------------------+-------------

                                                                                

In [15]:
# append all weather data
weather = weather2018.union(weather2019).union(weather2020).union(weather2021).union(weather2022)
weather.write.csv('gs://airline_project/weather.csv', mode="overwrite", header=True)

                                                                                

In [16]:
weather.columns

['Station',
 'DATE',
 'LATITUDE',
 'LONGITUDE',
 'ELEVATION',
 'NAME',
 'REPORT_TYPE',
 'SOURCE',
 'HourlyAltimeterSetting',
 'HourlyDewPointTemperature',
 'HourlyDryBulbTemperature',
 'HourlyRelativeHumidity',
 'HourlyStationPressure',
 'HourlyVisibility',
 'HourlyWetBulbTemperature',
 'HourlyWindDirection',
 'HourlyWindSpeed',
 'REM']

In [20]:
weather.count()

                                                                                

624571145

In [17]:
# change data types
from pyspark.sql.functions import col, to_timestamp
from pyspark.sql.types import IntegerType, BooleanType, StringType, DateType, DoubleType

weather = weather.withColumn("DATE", to_timestamp(col("DATE")))\
    .withColumn("LATITUDE", weather.LATITUDE.cast(DoubleType()))\
    .withColumn("LONGITUDE", weather.LONGITUDE.cast(DoubleType()))\
    .withColumn("ELEVATION", weather.ELEVATION.cast(DoubleType()))\
    .withColumn("HourlyAltimeterSetting", weather.HourlyAltimeterSetting.cast(DoubleType()))\
    .withColumn("HourlyDewPointTemperature", weather.HourlyDewPointTemperature.cast(DoubleType()))\
    .withColumn("HourlyDryBulbTemperature", weather.HourlyDryBulbTemperature.cast(DoubleType()))\
    .withColumn("HourlyRelativeHumidity", weather.HourlyRelativeHumidity.cast(DoubleType()))\
    .withColumn("HourlyStationPressure", weather.HourlyStationPressure.cast(DoubleType()))\
    .withColumn("HourlyVisibility", weather.HourlyVisibility.cast(DoubleType()))\
    .withColumn("HourlyWetBulbTemperature", weather.HourlyWetBulbTemperature.cast(DoubleType()))\
    .withColumn("HourlyWindDirection", weather.HourlyWindDirection.cast(DoubleType()))\
    .withColumn("HourlyWindSpeed", weather.HourlyWindSpeed.cast(DoubleType()))\
    .withColumn("REM", weather.REM.cast(DoubleType()))
weather.printSchema()

root
 |-- Station: string (nullable = true)
 |-- DATE: timestamp (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- NAME: string (nullable = true)
 |-- REPORT_TYPE: string (nullable = true)
 |-- SOURCE: string (nullable = true)
 |-- HourlyAltimeterSetting: double (nullable = true)
 |-- HourlyDewPointTemperature: double (nullable = true)
 |-- HourlyDryBulbTemperature: double (nullable = true)
 |-- HourlyRelativeHumidity: double (nullable = true)
 |-- HourlyStationPressure: double (nullable = true)
 |-- HourlyVisibility: double (nullable = true)
 |-- HourlyWetBulbTemperature: double (nullable = true)
 |-- HourlyWindDirection: double (nullable = true)
 |-- HourlyWindSpeed: double (nullable = true)
 |-- REM: double (nullable = true)



In [18]:
# extract day from date
from pyspark.sql.functions import to_date
weather = weather.withColumn('Day', to_date(weather.DATE))

In [19]:
weather.show(1)

+-----------+-------------------+--------+-----------+---------+--------+-----------+------+----------------------+-------------------------+------------------------+----------------------+---------------------+----------------+------------------------+-------------------+---------------+----+----------+
|    Station|               DATE|LATITUDE|  LONGITUDE|ELEVATION|    NAME|REPORT_TYPE|SOURCE|HourlyAltimeterSetting|HourlyDewPointTemperature|HourlyDryBulbTemperature|HourlyRelativeHumidity|HourlyStationPressure|HourlyVisibility|HourlyWetBulbTemperature|HourlyWindDirection|HourlyWindSpeed| REM|       Day|
+-----------+-------------------+--------+-----------+---------+--------+-----------+------+----------------------+-------------------------+------------------------+----------------------+---------------------+----------------+------------------------+-------------------+---------------+----+----------+
|83811099999|2018-01-01 09:00:00|   -24.4|-50.8333333|    808.0|IVAI, BR|      FM-

In [22]:
# aggregate by day level
from pyspark.sql import functions as F
weather_agg = weather.groupby('Station', 'Day').agg(F.avg('LATITUDE'),
                                          F.avg('LONGITUDE'),
                                          F.avg('ELEVATION'),
                                F.avg('HourlyAltimeterSetting'),
                                 F.avg('HourlyDewPointTemperature'),
                                 F.avg('HourlyDryBulbTemperature'),
                                 F.avg('HourlyRelativeHumidity'),
                                 F.avg('HourlyStationPressure'),
                                 F.avg('HourlyVisibility'),
                                 F.avg('HourlyWetBulbTemperature'),
                                 F.avg('HourlyWindDirection'),
                                F.avg('HourlyWindSpeed'),
                                F.avg('REM'))

In [None]:
weather_agg.show(1)

22/11/08 22:19:49 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 43:>                                                         (0 + 1) / 1]

+-----------+----------+------------------+------------------+------------------+---------------------------+------------------------------+-----------------------------+---------------------------+--------------------------+---------------------+-----------------------------+------------------------+--------------------+--------+
|    Station|       Day|     avg(LATITUDE)|    avg(LONGITUDE)|    avg(ELEVATION)|avg(HourlyAltimeterSetting)|avg(HourlyDewPointTemperature)|avg(HourlyDryBulbTemperature)|avg(HourlyRelativeHumidity)|avg(HourlyStationPressure)|avg(HourlyVisibility)|avg(HourlyWetBulbTemperature)|avg(HourlyWindDirection)|avg(HourlyWindSpeed)|avg(REM)|
+-----------+----------+------------------+------------------+------------------+---------------------------+------------------------------+-----------------------------+---------------------------+--------------------------+---------------------+-----------------------------+------------------------+--------------------+--------+
|

                                                                                

In [None]:
weather_agg.write.csv('gs://airline_project/weather_day.csv', mode="overwrite", header=True)

                                                                                

In [28]:
# check if the lattitude and longitude works well
weather_agg.where(weather_agg.Station == '38774099999').show(1)



+-----------+----------+-----------------+-----------------+--------------+---------------------------+------------------------------+-----------------------------+---------------------------+--------------------------+---------------------+-----------------------------+------------------------+--------------------+--------+
|    Station|       Day|    avg(LATITUDE)|   avg(LONGITUDE)|avg(ELEVATION)|avg(HourlyAltimeterSetting)|avg(HourlyDewPointTemperature)|avg(HourlyDryBulbTemperature)|avg(HourlyRelativeHumidity)|avg(HourlyStationPressure)|avg(HourlyVisibility)|avg(HourlyWetBulbTemperature)|avg(HourlyWindDirection)|avg(HourlyWindSpeed)|avg(REM)|
+-----------+----------+-----------------+-----------------+--------------+---------------------------+------------------------------+-----------------------------+---------------------------+--------------------------+---------------------+-----------------------------+------------------------+--------------------+--------+
|38774099999|2018-0

                                                                                

In [None]:
weather_agg.count()

                                                                                

21464177

In [87]:
# create a new dataframe for station and the corresponding lat and long
station_list = weather_agg.select('Station', 'avg(LATITUDE)', 'avg(LONGITUDE)').distinct()
print(station_list.count())



209250


                                                                                

In [94]:
station_list.show(1)



+-----------+----------+------------------+
|    Station|  Latitude|         Longitude|
+-----------+----------+------------------+
|71867099999|53.9666666|-101.1000000000001|
+-----------+----------+------------------+
only showing top 1 row





In [93]:
# average the long and lat in case the same station shares different long and lat in different years
station_list = station_list.groupby('Station').agg(F.avg('avg(LATITUDE)').alias('Latitude'),
                                          F.avg('avg(LONGITUDE)').alias('Longitude'))

In [98]:
station_list.write.csv('gs://airline_project/station_list.csv', mode="overwrite", header=True)

                                                                                

In [96]:
# check if the all stations are unique
station_list.count()

                                                                                

82015

In [97]:
station_list.select('Station').distinct().count()

                                                                                

82015

In [95]:
station_list.groupby('Station').count().filter('count > 1').show()



+-------+-----+
|Station|count|
+-------+-----+
+-------+-----+



                                                                                

In [6]:
df = spark.read \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace",True) \
    .csv("gs://mscabigdata7fall2022/weather_day.csv/",inferSchema=True, header=True )

                                                                                