In [2]:
# 1. Correlation between latitude and average temperature
# 2. Min temperature for each country in the month of May each year.
# 3. Standard deviation of min temperature for each country in the month of may each year.
# 4. 5 windiest  days in Belgium
# 5. Difference of each country’s average temperature from global average temperature for 2021.  (Window Function)
# 6. Rolling average of temperature in Canada for the month of December 2020 (Window Function)
# 7. Find hottest and coldest day of Australia for each month in 2019.
# 8. Find 3 windiest days of Canada for each month  in 2020.
# 9. Highest temperatures for each continent (join) and the country, date on which it was recorded.

In [9]:
#Import all necessary libraries

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import SparkSession , functions as fun, Window as Wd

In [10]:
# Creating a SparkSession in Python

spark = SparkSession.builder.appName('weather_history')\
    .config('spark.driver.extraClassPath', '/usr/lib/jvm/java-17-openjdk-amd64/lib/postgresql-42.5.0.jar')\
    .getOrCreate()


In [11]:
# Read csv file
historical_weather_df = spark.read.option("header",True) \
     .csv("apache_spark_project/cleaned_data/cleaned_weather_data.csv")

In [12]:
historical_weather_df.show()

+----------+--------+-------+---------+---------+----+----+----+----+------+
|      date| country|   city| Latitude|Longitude|tavg|tmin|tmax|wspd|  pres|
+----------+--------+-------+---------+---------+----+----+----+----+------+
|2018-07-21|Abkhazia|Sukhumi|43.001525|41.023415|23.4|20.9|25.5| 9.3|1009.6|
|2018-07-22|Abkhazia|Sukhumi|43.001525|41.023415|23.5|21.0|25.7| 9.4|1010.0|
|2018-07-23|Abkhazia|Sukhumi|43.001525|41.023415|23.5|21.1|25.5| 8.2|1007.7|
|2018-07-24|Abkhazia|Sukhumi|43.001525|41.023415|24.3|20.8|27.1| 9.3|1004.4|
|2018-07-25|Abkhazia|Sukhumi|43.001525|41.023415|26.5|22.7|30.0| 9.7|1002.0|
|2018-07-26|Abkhazia|Sukhumi|43.001525|41.023415|26.7|24.6|28.7| 9.8|1006.7|
|2018-07-27|Abkhazia|Sukhumi|43.001525|41.023415|26.5|24.6|28.1| 8.4|1009.1|
|2018-07-28|Abkhazia|Sukhumi|43.001525|41.023415|26.4|24.5|28.2| 8.6|1007.5|
|2018-07-29|Abkhazia|Sukhumi|43.001525|41.023415|26.3|24.4|28.1| 9.3|1007.0|
|2018-07-30|Abkhazia|Sukhumi|43.001525|41.023415|26.5|24.5|28.4| 9.3|1007.4|

In [16]:
historical_weather_df.printSchema()

root
 |-- date: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- tavg: string (nullable = true)
 |-- tmin: string (nullable = true)
 |-- tmax: string (nullable = true)
 |-- wspd: string (nullable = true)
 |-- pres: string (nullable = true)



In [17]:
# Read csv file
country_continent_df = spark.read.option("header",True) \
     .csv("apache_spark_project/cleaned_data/cleaned_country_continent.csv")

In [18]:
country_continent_df.show()

+-------------------+---------+
|            country|continent|
+-------------------+---------+
|        Afghanistan|     Asia|
|      �land Islands|   Europe|
|            Albania|   Europe|
|            Algeria|   Africa|
|     American Samoa|  Oceania|
|            Andorra|   Europe|
|             Angola|   Africa|
|           Anguilla| Americas|
|Antigua and Barbuda| Americas|
|          Argentina| Americas|
|            Armenia|     Asia|
|              Aruba| Americas|
|          Australia|  Oceania|
|            Austria|   Europe|
|         Azerbaijan|     Asia|
|            Bahamas| Americas|
|            Bahrain|     Asia|
|         Bangladesh|     Asia|
|           Barbados| Americas|
|            Belarus|   Europe|
+-------------------+---------+
only showing top 20 rows



1. Correlation between latitude and average temperature

In [19]:
corr_df = historical_weather_df.select(fun.corr('Latitude', 'tavg'))

In [20]:
corr_df.show()

+--------------------+
|corr(Latitude, tavg)|
+--------------------+
|-0.45070195626748133|
+--------------------+



 2. Min temperature for each country in the month of May each year.

In [21]:
min_temp_may_df = historical_weather_df.groupBy(fun.year('date').alias('year'), fun.month('date').alias('month'), 'country').agg(fun.min('tmin').alias('min_temp')).filter('month == 05').orderBy('country', 'year')

min_temp_may_df.show()

+----+-----+--------------+--------+
|year|month|       country|min_temp|
+----+-----+--------------+--------+
|2019|    5|      Abkhazia|     0.9|
|2020|    5|      Abkhazia|    10.3|
|2021|    5|      Abkhazia|    10.2|
|2022|    5|      Abkhazia|    10.4|
|2018|    5| Aland Islands|    -0.6|
|2019|    5| Aland Islands|    -0.2|
|2020|    5| Aland Islands|    -0.3|
|2021|    5| Aland Islands|    -1.1|
|2022|    5| Aland Islands|    -0.3|
|2018|    5|       Albania|    11.1|
|2019|    5|       Albania|    10.1|
|2020|    5|       Albania|    10.7|
|2021|    5|       Albania|    11.6|
|2022|    5|       Albania|    10.2|
|2018|    5|       Algeria|    11.1|
|2019|    5|       Algeria|    13.7|
|2020|    5|       Algeria|    12.7|
|2021|    5|       Algeria|    13.1|
|2022|    5|       Algeria|    10.0|
|2018|    5|American Samoa|    22.6|
+----+-----+--------------+--------+
only showing top 20 rows



3. Standard deviation of min temperature for each country in the month of may each year.

In [26]:
stddev_min_temp_may_df = min_temp_may_df\
                            .groupBy('country')\
                            .agg(fun.stddev('min_temp').alias('stddev'))\
                            .orderBy('country')\
                            .select('country',fun.round('stddev', 1).alias('stddev_min_temp_may'))

stddev_min_temp_may_df.show()

+-------------------+-------------------+
|            country|stddev_min_temp_may|
+-------------------+-------------------+
|           Abkhazia|                4.7|
|      Aland Islands|                0.4|
|            Albania|                0.6|
|            Algeria|                1.5|
|     American Samoa|                0.8|
|             Angola|                0.7|
|           Anguilla|                0.6|
|Antigua and Barbuda|                1.6|
|          Argentina|                0.3|
|              Aruba|                2.3|
|            Austria|                0.6|
|         Azerbaijan|                0.5|
|            Bahamas|                1.0|
|            Bahrain|                0.8|
|         Bangladesh|                1.3|
|           Barbados|                0.4|
|            Belarus|                5.5|
|            Belgium|                4.7|
|            Bermuda|                1.2|
|      Bouvet Island|                0.2|
+-------------------+-------------

4. 5 windiest  days in Belgium

In [40]:
# Window Function
window_spec = Wd.partitionBy('country').orderBy('wspd')
ranked_df = historical_weather_df\
                .withColumn('dense_rn', fun.dense_rank().over(window_spec))\
                .filter('dense_rn <= 5')\
                .filter('country == "Belgium"')

windiest_belgium_df = ranked_df.select('date', 'country', col('wspd').alias('avg_wind'), col('dense_rn').alias('rank'))
windiest_belgium_df.show(5)

+----------+-------+--------+----+
|      date|country|avg_wind|rank|
+----------+-------+--------+----+
|2021-07-05|Belgium|    10.0|   1|
|2022-01-11|Belgium|    10.0|   1|
|2022-02-11|Belgium|    10.0|   1|
|2018-04-29|Belgium|    10.1|   2|
|2018-05-29|Belgium|    10.1|   2|
+----------+-------+--------+----+
only showing top 5 rows



5. Difference of each country’s average temperature from global average temperature for 2021.  (Window Function)

In [15]:
avg_temp_df = historical_weather_df\
                .filter(fun.year('date') == 2021)\
                .groupBy('country')\
                .agg(fun.mean('tavg').alias('avg_temp'))\
                .orderBy('country')

avg_temp_df.show()

global_avg_temp = round(historical_weather_df.select(fun.mean('tavg')).collect()[0][0], 1)
global_avg_temp # 20.9

diff_global_avg_temp_df = avg_temp_df\
                            .withColumn('year', fun.lit(2021))\
                            .withColumn('avg_temp',fun.round('avg_temp', 1))\
                            .withColumn('global_avg_temp', fun.lit(global_avg_temp))\
                            .withColumn('difference', fun.round(col('avg_temp') - col('global_avg_temp'), 1))

diff_global_avg_temp_df = diff_global_avg_temp_df.select('year', 'country', 'avg_temp', 'global_avg_temp', 'difference')

diff_global_avg_temp_df.show()

+-------------------+-------------------+
|            country|           avg_temp|
+-------------------+-------------------+
|           Abkhazia|  15.08465753424658|
|      Aland Islands|  6.564285714285716|
|            Albania| 16.885164835164826|
|            Algeria| 19.529863013698623|
|     American Samoa|  27.16657534246574|
|             Angola| 25.747945205479454|
|           Anguilla| 26.690958904109575|
|Antigua and Barbuda|   26.8358904109589|
|          Argentina| 17.718082191780802|
|              Aruba| 27.780547945205477|
|            Austria| 12.865109890109885|
|         Azerbaijan| 15.511232876712326|
|            Bahamas| 25.486538461538434|
|            Bahrain| 28.202191780821906|
|         Bangladesh| 26.944109589041116|
|           Barbados|  27.23013698630137|
|            Belarus| 7.8641095890410995|
|            Belgium| 10.930494505494517|
|            Bermuda| 22.173076923076913|
|      Bouvet Island|-0.8765363128491619|
+-------------------+-------------

6. Rolling average of temperature in Canada for the month of December 2020 (Window Function)

In [16]:
window_spec = Wd.partitionBy('country').orderBy('date').rowsBetween(Wd.unboundedPreceding, Wd.currentRow)

canada_december_2020_df = historical_weather_df\
                        .filter(fun.year('date') == 2020)\
                        .filter(fun.month('date') == 12)\
                        .filter('country == "Canada"')\
                        .withColumn('rolling_avg', fun.mean('tavg').over(window_spec))\
                        .select('date', 'country', 'tavg', fun.round('rolling_avg', 1).alias('rolling_avg'))

canada_december_2020_df.show()

+----------+-------+-----+-----------+
|      date|country| tavg|rolling_avg|
+----------+-------+-----+-----------+
|2020-12-01| Canada|  3.1|        3.1|
|2020-12-02| Canada| -2.2|        0.4|
|2020-12-03| Canada| -0.7|        0.1|
|2020-12-04| Canada|  1.9|        0.5|
|2020-12-05| Canada| -1.1|        0.2|
|2020-12-06| Canada| -2.8|       -0.3|
|2020-12-07| Canada| -4.4|       -0.9|
|2020-12-08| Canada| -6.7|       -1.6|
|2020-12-09| Canada| -1.9|       -1.6|
|2020-12-10| Canada|  0.9|       -1.4|
|2020-12-11| Canada|  1.9|       -1.1|
|2020-12-12| Canada|  1.9|       -0.8|
|2020-12-13| Canada|  1.7|       -0.6|
|2020-12-14| Canada| -2.9|       -0.8|
|2020-12-15| Canada|-10.4|       -1.4|
|2020-12-16| Canada|-14.7|       -2.3|
|2020-12-17| Canada| -8.8|       -2.7|
|2020-12-18| Canada| -8.7|       -3.0|
|2020-12-19| Canada| -8.4|       -3.3|
|2020-12-20| Canada| -1.2|       -3.2|
+----------+-------+-----+-----------+
only showing top 20 rows



7. Find hottest and coldest day of Belgium for each month in 2019.

In [17]:
window_spec1 = Wd.partitionBy('country', fun.year('date'), fun.month('date')).orderBy('tmin')
window_spec2 = Wd.partitionBy('country', fun.year('date'), fun.month('date')).orderBy(fun.desc('tmax'))

historical_weather_df\
        .filter('country == "Belgium"')\
        .filter(fun.year('date') == 2019)\
        .withColumn('min_rank', fun.dense_rank().over(window_spec1))\
        .withColumn('max_rank', fun.dense_rank().over(window_spec2))\
        .withColumn('is_min_or_max', fun.when(col('min_rank') == 1, fun.lit('min')).when(col('max_rank') == 1, fun.lit('max')))\
        .filter((col('min_rank') == 1) | (col('max_rank') == 1))\
        .select(fun.to_date('date').alias('date'), 'country', 'tmin', 'tmax', 'is_min_or_max')\
        .show()


+----------+-------+----+----+-------------+
|      date|country|tmin|tmax|is_min_or_max|
+----------+-------+----+----+-------------+
|2019-01-13|Belgium| 6.9| 9.7|          max|
|2019-01-29|Belgium|-0.6| 4.9|          min|
|2019-02-07|Belgium| 5.9| 9.9|          max|
|2019-02-03|Belgium|-0.1| 5.9|          min|
|2019-03-13|Belgium| 4.6| 9.9|          max|
|2019-03-19|Belgium| 0.1|11.2|          min|
|2019-04-03|Belgium| 3.6| 9.8|          max|
|2019-04-14|Belgium|-0.2| 9.7|          min|
|2019-05-24|Belgium|12.1|22.4|          max|
|2019-05-06|Belgium| 1.8|11.7|          min|
|2019-06-29|Belgium|16.8|32.8|          max|
|2019-06-14|Belgium|10.7|23.2|          min|
|2019-07-25|Belgium|23.7|39.9|          max|
|2019-07-09|Belgium|10.4|21.3|          min|
|2019-08-27|Belgium|19.8|33.5|          max|
|2019-08-14|Belgium|10.9|20.8|          min|
|2019-09-21|Belgium| 9.0|27.3|          max|
|2019-09-05|Belgium|10.8|17.2|          min|
|2019-10-13|Belgium|15.0|23.4|          max|
|2019-10-3

8. Find 4 windiest days of Canada for each month in 2021.

In [39]:
window_spec = Wd.partitionBy('country', fun.year('date'), fun.month('date')).orderBy('wspd')

canada_2021_windiest_df=historical_weather_df\
        .filter('country == "Canada"')\
        .filter(fun.year('date') == 2021)\
        .withColumn('max_rank', fun.dense_rank().over(window_spec))\
        .select(fun.to_date('date').alias('date'), 'country', 'wspd')
canada_2021_windiest_df.show(4)

+----------+-------+----+
|      date|country|wspd|
+----------+-------+----+
|2021-01-01| Canada| 6.9|
|2021-01-02| Canada|13.5|
|2021-01-03| Canada| 7.5|
|2021-01-04| Canada| 5.1|
+----------+-------+----+
only showing top 4 rows



9. Highest temperatures for each continent (join) and the country, date on which it was recorded.

In [20]:
join_expr = historical_weather_df['country'] == country_continent_df['country']

joined_df = historical_weather_df.join(fun.broadcast(country_continent_df), join_expr) # since country dataset has a small size, we use broadcast join

window_spec = Wd.partitionBy('continent').orderBy('tmax')

highest_temp_continent_df = joined_df\
                            .withColumn('dense_rnk', fun.dense_rank().over(window_spec))\
                            .filter('dense_rnk == 1')\
                            .select('continent', 'date', historical_weather_df['country'], col('tmax').alias('max_temp'))

highest_temp_continent_df.show()

+---------+----------+-------------+--------+
|continent|      date|      country|max_temp|
+---------+----------+-------------+--------+
|   Africa|2021-01-02|      Algeria|    10.3|
|   Africa|2019-12-30|        Libya|    10.3|
|   Europe|2018-11-21|      Belarus|    -0.1|
|   Europe|2019-01-31|      Belgium|    -0.1|
|   Europe|2019-01-23|      Croatia|    -0.1|
|   Europe|2018-01-10|      Estonia|    -0.1|
|   Europe|2018-11-27|      Estonia|    -0.1|
|   Europe|2019-02-12|      Estonia|    -0.1|
|   Europe|2021-01-31|Faroe Islands|    -0.1|
|   Europe|2018-02-15|      Finland|    -0.1|
|   Europe|2018-03-21|      Finland|    -0.1|
|   Europe|2018-12-20|      Finland|    -0.1|
|   Europe|2019-01-06|      Finland|    -0.1|
|   Europe|2021-01-12|      Finland|    -0.1|
|   Europe|2019-01-22|      Germany|    -0.1|
|   Europe|2022-01-11|      Germany|    -0.1|
|   Europe|2020-01-18|      Hungary|    -0.1|
|   Europe|2018-02-08|      Iceland|    -0.1|
|   Europe|2018-02-09|      Icelan