<a href="https://colab.research.google.com/github/lb424/Spark-DF-Implementation/blob/main/Spark-Implementation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

*Requires data sets to be located at a certain directory in google drive:

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, udf, broadcast, concat_ws
from pyspark.sql.types import DoubleType, StringType, FloatType
from google.colab import drive

spark = SparkSession.builder.appName("A4").getOrCreate()

drive.mount('/content/drive')

temperature_df = spark.read.options(header='true', inferSchema='true').csv('/content/drive/MyDrive/colab_inputs/A4/city_temperature.csv')
country_list_df = spark.read.options(header='true', inferSchema='true').csv('/content/drive/MyDrive/colab_inputs/A4/country-list.csv')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Task A - Find the average of AvgTemperature for each Region

In [None]:
task_a = temperature_df.groupBy("Region").agg(avg("AvgTemperature").alias("AverageTemperature"))
task_a.show()

result_a = task_a.select(concat_ws(", ", task_a.Region, task_a.AverageTemperature.cast("string")).alias("output"))
result_a.coalesce(1).write.mode("overwrite").format("text").save("/content/drive/MyDrive/colab_outputs/A4/result_a")

+--------------------+------------------+
|              Region|AverageTemperature|
+--------------------+------------------+
|              Europe| 46.69628524306878|
|              Africa| 53.54951656193528|
|       North America|55.300932625245395|
|         Middle East|  68.3845217196125|
|South/Central Ame...|62.189438801074665|
|                Asia| 62.56864868961511|
|Australia/South P...|61.180869127275976|
+--------------------+------------------+



Task B - Find the average of AvgTemperature by Year for countries only located in the “Europe” Region.

In [None]:
task_b = temperature_df.filter(temperature_df.Region == "Europe").groupBy("Year").agg(avg("AvgTemperature").alias("AverageTemperature"))
task_b.show()

result_b = task_b.select(concat_ws(", ", task_b.Year, task_b.AverageTemperature.cast("string")).alias("output"))
result_b.coalesce(1).write.mode("overwrite").format("text").save("/content/drive/MyDrive/colab_outputs/A4/result_b")

+----+------------------+
|Year|AverageTemperature|
+----+------------------+
|2003| 43.50427513880328|
|2007| 50.78712918660323|
|2018| 50.20741891208571|
|2015| 51.77580680895058|
|2006|48.090954307314625|
|2013| 51.17017400962613|
|1997|  41.2918051750382|
|2014| 52.33811181044038|
|2019| 49.60529547753217|
|2004| 47.62057117070647|
|1996| 36.53758955676995|
|1998| 40.93638356164395|
|2020| 44.80030272452078|
|2012| 49.11577509347122|
|2009| 49.92103821394317|
|2016|50.392416186678325|
|1995| 38.63447394057469|
| 200|             -99.0|
|2001|44.922228310502405|
|2005| 48.85939986953682|
+----+------------------+
only showing top 20 rows



Task C - Find the average of AvgTemperature by City only located in the Country “Jordan”.

In [None]:
task_c = temperature_df.filter(temperature_df.Country == "Jordan").groupBy("City").agg(avg("AvgTemperature").alias("AverageTemperature"))
task_c.show()

result_c = task_c.select(concat_ws(", ", task_c.City, task_c.AverageTemperature.cast("string")).alias("output"))
result_c.coalesce(1).write.mode("overwrite").format("text").save("/content/drive/MyDrive/colab_outputs/A4/result_c")

+-----+------------------+
| City|AverageTemperature|
+-----+------------------+
|Amman| 64.16010144614734|
+-----+------------------+



Task D - For each country, find the capital and average of AvgTemperature of that capital city.

In [None]:
task_d = temperature_df.join(country_list_df, (temperature_df.City == country_list_df.capital) & (temperature_df.Country == country_list_df.country), "inner").groupBy("capital", temperature_df.Country).agg(avg("AvgTemperature").alias("AverageTemperature"))
task_d.show()

result_d = task_d.select(concat_ws(", ", task_d.capital, task_d.Country, task_d.AverageTemperature.cast("string")).alias("output"))
result_d.coalesce(1).write.mode("overwrite").format("text").save("/content/drive/MyDrive/colab_outputs/A4/result_d")

+-------------+--------------------+------------------+
|      capital|             Country|AverageTemperature|
+-------------+--------------------+------------------+
|       Zagreb|             Croatia|46.928613059902794|
|       Manama|             Bahrain| 80.63559248866856|
|    Abu Dhabi|United Arab Emirates| 82.19249946039282|
|  Addis Ababa|            Ethiopia| 25.45525551371705|
|      Bishkek|          Kyrgyzstan| 51.37300000000035|
|       Skopje|           Macedonia| 54.00780356179161|
|    Bujumbura|             Burundi|-65.39713845476541|
|     Helsinki|             Finland| 42.24399956831414|
|      Algiers|             Algeria|63.755439240232846|
|      Bangkok|            Thailand| 72.46408588158758|
|     Damascus|               Syria| 62.80611980572022|
|         Bern|         Switzerland|  49.0322793006693|
|        Cairo|               Egypt| 71.95347507014904|
|     Windhoek|             Namibia|57.990341031729066|
|        Rabat|             Morocco|  62.7334556

Task E - Solve the question 1D using a broadcast variable.

In [None]:
broadcast = broadcast(country_list_df)

task_e = temperature_df.join(broadcast, (temperature_df.City == broadcast.capital) & (temperature_df.Country == broadcast.country), "inner").groupBy("capital", temperature_df.Country).agg(avg("AvgTemperature").alias("AverageTemperature"))
task_e.show()

result_e = task_e.select(concat_ws(", ", task_e.capital, task_e.Country, task_e.AverageTemperature.cast("string")).alias("output"))
result_e.coalesce(1).write.mode("overwrite").format("text").save("/content/drive/MyDrive/colab_outputs/A4/result_e")

+-------------+--------------------+------------------+
|      capital|             Country|AverageTemperature|
+-------------+--------------------+------------------+
|       Zagreb|             Croatia|46.928613059902794|
|       Manama|             Bahrain| 80.63559248866856|
|    Abu Dhabi|United Arab Emirates| 82.19249946039282|
|  Addis Ababa|            Ethiopia| 25.45525551371705|
|      Bishkek|          Kyrgyzstan| 51.37300000000035|
|       Skopje|           Macedonia| 54.00780356179161|
|    Bujumbura|             Burundi|-65.39713845476541|
|     Helsinki|             Finland| 42.24399956831414|
|      Algiers|             Algeria|63.755439240232846|
|      Bangkok|            Thailand| 72.46408588158758|
|     Damascus|               Syria| 62.80611980572022|
|         Bern|         Switzerland|  49.0322793006693|
|        Cairo|               Egypt| 71.95347507014904|
|     Windhoek|             Namibia|57.990341031729066|
|        Rabat|             Morocco|  62.7334556

Task F - Solve the question 1D. While solving this question, please utilize the User Defined Function (UDF) to do the following tasks:

In [None]:
from pyspark.sql.types import BooleanType
@udf(BooleanType())
def filter_years(year):
    return year >= 2010

@udf(StringType())
def format_output(capital, country, avg_temp):
    return f"{capital} is the capital of {country} and its average temperature is {avg_temp:.2f}"

task_e = temperature_df.join(country_list_df,(temperature_df.City == country_list_df.capital) & (temperature_df.Country == country_list_df.country), "inner").filter(filter_years(temperature_df.Year)).groupBy("capital", temperature_df.Country).agg(avg("AvgTemperature").alias("AverageTemperature")).withColumn("output", format_output(col("capital"), col("country"), col("AverageTemperature")))
task_e.select("output").show(truncate=False)

task_e.select("output").coalesce(1).write.mode("overwrite").format("text").save("/content/drive/MyDrive/colab_outputs/A4/result_f")


+---------------------------------------------------------------------------------------+
|output                                                                                 |
+---------------------------------------------------------------------------------------+
|Zagreb is the capital of Croatia and its average temperature is 54.23                  |
|Manama is the capital of Bahrain and its average temperature is 81.08                  |
|Abu Dhabi is the capital of United Arab Emirates and its average temperature is 82.61  |
|Addis Ababa is the capital of Ethiopia and its average temperature is 50.38            |
|Bishkek is the capital of Kyrgyzstan and its average temperature is 52.57              |
|Skopje is the capital of Macedonia and its average temperature is 54.25                |
|Helsinki is the capital of Finland and its average temperature is 42.33                |
|Algiers is the capital of Algeria and its average temperature is 63.74                 |
|Bangkok i