## **Question 1**

In [1]:
#Installing java and spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null #Install java
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz ## Install Apache Spark
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

In [49]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [50]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from datetime import datetime, date, timedelta
from dateutil import relativedelta
from pyspark.sql import SQLContext, Row
from pyspark.sql import DataFrame
from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql import functions as F
from pyspark.sql.functions import collect_list, collect_set, concat, first, array_distinct, col, size, expr, max, desc, year
from pyspark.sql import DataFrame
import random

In [51]:
spark = SparkSession\
        .builder\
        .getOrCreate()

In [52]:
##Reading flight data 2015
flightData2015 = spark.read.csv("/content/2015-summary.csv", inferSchema = True, header = True)

In [53]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [54]:
flightData2015.sort("count").explain()

== Physical Plan ==
*(1) Sort [count#7768 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#7768 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [id=#993]
   +- FileScan csv [DEST_COUNTRY_NAME#7766,ORIGIN_COUNTRY_NAME#7767,count#7768] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [55]:
spark.conf.set("spark.sql.shuffle.partitions", "5")


In [56]:
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [59]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [60]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")
dataFrameWay = flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.count()
sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#7766], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#7766, 5), ENSURE_REQUIREMENTS, [id=#1063]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#7766], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#7766] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#7766], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#7766, 5), ENSURE_REQUIREMENTS, [id=#1082]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#7766], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#7766] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNT

In [61]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)

[Row(max(count)=370002)]

In [62]:
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [63]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [64]:
from pyspark.sql.functions import desc
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [65]:
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.explain()

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#7869L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#7766,destination_total#7869L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#7766], functions=[sum(cast(count#7768 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#7766, 5), ENSURE_REQUIREMENTS, [id=#1209]
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#7766], functions=[partial_sum(cast(count#7768 as bigint))])
         +- FileScan csv [DEST_COUNTRY_NAME#7766,count#7768] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




## **Question 2**

In [66]:
#Read the first dataframe
df1 = spark.read.csv("/content/GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv", inferSchema = True, header = True)
df1.show()
df1.describe().show()

+----------+-------------------+-----------------------------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|Country|
+----------+-------------------+-----------------------------+-------+
|1743-11-01| 4.3839999999999995|                        2.294|  Åland|
|1743-12-01|               null|                         null|  Åland|
|1744-01-01|               null|                         null|  Åland|
|1744-02-01|               null|                         null|  Åland|
|1744-03-01|               null|                         null|  Åland|
|1744-04-01|               1.53|                         4.68|  Åland|
|1744-05-01|  6.702000000000001|                        1.789|  Åland|
|1744-06-01| 11.609000000000002|                        1.577|  Åland|
|1744-07-01|             15.342|                         1.41|  Åland|
|1744-08-01|               null|                         null|  Åland|
|1744-09-01|             11.702|                        1.517|  Åland|
|1744-

In [67]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [20]:
df1.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- Country: string (nullable = true)



For which country and during what year was the highest average temperature  observed?
We observe from the data that Kuwait records the higest average temperature

In [68]:

#Order by average temperature in descending order
# df1.orderBy(df1['AverageTemperature'].desc()).show()

df1.sort(desc("AverageTemperature")).limit(5).show()

+----------+------------------+-----------------------------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|Country|
+----------+------------------+-----------------------------+-------+
|2012-07-01| 38.84200000000001|                        0.464| Kuwait|
|2000-07-01| 38.70500000000001|                        0.467| Kuwait|
|2010-07-01| 38.49500000000001|                        0.498| Kuwait|
|1998-08-01|            38.436|                        0.353| Kuwait|
|2000-08-01|            38.315|                        0.593| Kuwait|
+----------+------------------+-----------------------------+-------+



In [71]:
#Select the top row
df1.orderBy(df1['AverageTemperature'].desc()).head(1)[0]

Row(dt='2012-07-01', AverageTemperature=38.84200000000001, AverageTemperatureUncertainty=0.464, Country='Kuwait')

In [72]:
#Group by country
grouped_df = df1.groupBy('Country')

In [73]:
#Calculate max temp
max_temp = grouped_df.agg({"AverageTemperature": 'max'})
max_temp = max_temp.withColumnRenamed("max(AverageTemperature)", "MaxAverageTemperature")
max_temp.show()



+--------------------+---------------------+
|             Country|MaxAverageTemperature|
+--------------------+---------------------+
|         Afghanistan|               28.533|
|             Algeria|               35.829|
|             Andorra|   24.313000000000002|
|              Angola|                25.17|
|             Austria|               20.081|
|             Belarus|               22.811|
|             Belgium|               22.837|
|             Bolivia|   24.508000000000003|
|            Botswana|   28.860000000000007|
|            Cambodia|                30.43|
|          Cape Verde|               28.415|
|Central African R...|               29.871|
|Congo (Democratic...|               26.193|
|      Czech Republic|               22.092|
|             Denmark|   0.6990000000000001|
|   Equatorial Guinea|               27.932|
|              France|               23.662|
|    French Polynesia|               28.619|
|           Indonesia|               27.478|
|         

In [74]:
#Calculate min temp
min_temp = grouped_df.agg({"AverageTemperature": 'min'})
min_temp = min_temp.withColumnRenamed("min(AverageTemperature)", "MinAverageTemperature")
min_temp.show()

+--------------------+---------------------+
|             Country|MinAverageTemperature|
+--------------------+---------------------+
|         Afghanistan|   -4.552999999999999|
|             Algeria|    9.526000000000002|
|             Andorra|               -0.924|
|              Angola|               17.227|
|             Austria|  -11.077000000000002|
|             Belarus|              -16.527|
|             Belgium|  -6.0790000000000015|
|             Bolivia|               15.877|
|            Botswana|   12.574000000000002|
|            Cambodia|               21.896|
|          Cape Verde|               20.008|
|Central African R...|   22.473000000000003|
|Congo (Democratic...|               21.704|
|      Czech Republic|  -11.615999999999998|
|             Denmark|               -36.83|
|   Equatorial Guinea|                21.48|
|              France|   1.6020000000000003|
|    French Polynesia|               23.557|
|           Indonesia|                23.73|
|         

In [75]:
from pyspark.sql.functions import year, max, min


# Extract year from dt column and find min and max temperature
df = df1.withColumn("Year", year(df1["dt"])) \
       .groupBy("Country", "Year") \
       .agg(max("AverageTemperature").alias("MaxTemp"),
            min("AverageTemperature").alias("MinTemp"))

#  Calculate difference between min and max temperature
df = df.withColumn("TempDiff", df["MaxTemp"] - df["MinTemp"])

#  Show the result
df.show()


+-------+----+------------------+-------------------+------------------+
|Country|Year|           MaxTemp|            MinTemp|          TempDiff|
+-------+----+------------------+-------------------+------------------+
|  Åland|1758|15.119000000000002|              -5.71|            20.829|
|  Åland|1760|            16.539|            -10.332|26.871000000000002|
|  Åland|1763|            17.515| -4.896000000000001|            22.411|
|  Åland|1766|17.580000000000002|             -4.681|22.261000000000003|
|  Åland|1782|            15.356|              -5.24|            20.596|
|  Åland|1785|            15.885|             -6.432|            22.317|
|  Åland|1789|            18.543| -8.952000000000002|            27.495|
|  Åland|1794|16.625999999999998|             -1.833|18.458999999999996|
|  Åland|1796|            16.417|             -4.383|              20.8|
|  Åland|1803|            16.712|             -8.914|25.625999999999998|
|  Åland|1805|             16.43|-7.420999999999998

In [76]:
# Calculate difference
# Join the max_temp and min_temp DataFrames on the categorical variable(s)
temp_diff = max_temp.join(min_temp, on=["Country"])

# Calculate the temperature difference
temp_diff = temp_diff.withColumn("TemperatureDifference", temp_diff["MaxAverageTemperature"] - temp_diff["MinAverageTemperature"])

# Show the resulting DataFrame
temp_diff.show()


+--------------------+---------------------+---------------------+---------------------+
|             Country|MaxAverageTemperature|MinAverageTemperature|TemperatureDifference|
+--------------------+---------------------+---------------------+---------------------+
|         Afghanistan|               28.533|   -4.552999999999999|               33.086|
|             Algeria|               35.829|    9.526000000000002|   26.302999999999997|
|             Andorra|   24.313000000000002|               -0.924|   25.237000000000002|
|              Angola|                25.17|               17.227|    7.943000000000001|
|             Austria|               20.081|  -11.077000000000002|               31.158|
|             Belarus|               22.811|              -16.527|               39.338|
|             Belgium|               22.837|  -6.0790000000000015|               28.916|
|             Bolivia|   24.508000000000003|               15.877|    8.631000000000002|
|            Botswana

In [77]:
# Sort the temp_diff DataFrame in descending order by the "TempDifference" column
temp_diff = temp_diff.orderBy("TemperatureDifference", ascending=False)

# Select the top 10 records
top10_countries = temp_diff.limit(10)

# Show the resulting DataFrame
top10_countries.show()


+------------+---------------------+---------------------+---------------------+
|     Country|MaxAverageTemperature|MinAverageTemperature|TemperatureDifference|
+------------+---------------------+---------------------+---------------------+
|  Kazakhstan|   25.561999999999998|              -23.601|               49.163|
|    Mongolia|   20.715999999999998|  -27.441999999999997|   48.157999999999994|
|      Russia|               16.893|              -30.577|                47.47|
|      Canada|               14.796|              -28.736|               43.532|
|  Uzbekistan|               30.375|              -12.323|               42.698|
|Turkmenistan|               32.136|               -8.443|               40.579|
|     Finland|               19.132|                -21.2|               40.332|
|     Belarus|               22.811|              -16.527|               39.338|
|     Ukraine|               24.297|  -14.724000000000002|               39.021|
|     Estonia|   22.33199999

Which top 10 countries had the biggest change in average temperature?


In [78]:


print("Top 10 countries with the biggest change in average temperature:")
for row in top10_countries.collect():
    print(f"{row['Country']}: {row['TemperatureDifference']} degrees Celsius")


Top 10 countries with the biggest change in average temperature:
Kazakhstan: 49.163 degrees Celsius
Mongolia: 48.157999999999994 degrees Celsius
Russia: 47.47 degrees Celsius
Canada: 43.532 degrees Celsius
Uzbekistan: 42.698 degrees Celsius
Turkmenistan: 40.579 degrees Celsius
Finland: 40.332 degrees Celsius
Belarus: 39.338 degrees Celsius
Ukraine: 39.021 degrees Celsius
Estonia: 38.882999999999996 degrees Celsius


 CO2 emissions per capita per country

In [81]:
df2 = spark.read.csv("/content/CO2 emissions per capita per country.csv", inferSchema = True, header = True)

df2.describe().show()




+-------+------------+------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------

In [82]:
#Change df2 column name from 'Country name' to 'Country'
df2 = df2.withColumnRenamed('Country Name','Country')

df2.describe().show()

+-------+-----------+------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-------------

In [83]:
df2.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Country Code: string (nullable = true)
 |-- 1960: double (nullable = true)
 |-- 1961: double (nullable = true)
 |-- 1962: double (nullable = true)
 |-- 1963: double (nullable = true)
 |-- 1964: double (nullable = true)
 |-- 1965: double (nullable = true)
 |-- 1966: double (nullable = true)
 |-- 1967: double (nullable = true)
 |-- 1968: double (nullable = true)
 |-- 1969: double (nullable = true)
 |-- 1970: double (nullable = true)
 |-- 1971: double (nullable = true)
 |-- 1972: double (nullable = true)
 |-- 1973: double (nullable = true)
 |-- 1974: double (nullable = true)
 |-- 1975: double (nullable = true)
 |-- 1976: double (nullable = true)
 |-- 1977: double (nullable = true)
 |-- 1978: double (nullable = true)
 |-- 1979: double (nullable = true)
 |-- 1980: double (nullable = true)
 |-- 1981: double (nullable = true)
 |-- 1982: double (nullable = true)
 |-- 1983: double (nullable = true)
 |-- 1984: double (nullable = true)
 |-- 1985: d

In [84]:
from pyspark.sql.functions import *
from pyspark.sql import DataFrame
from typing import Iterable

In [85]:
def melt(
        df: DataFrame,
        id_vars: Iterable[str], value_vars: Iterable[str],
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name))
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

In [86]:
melt(df2, id_vars=['Country'],
            value_vars=['1960', '1961', '1962', '1963', '1964', '1965', '1966', '1967', '1968', '1969',
                        '1970', '1971', '1972', '1973', '1974', '1975', '1976', '1977', '1978', '1979',
                        '1980', '1981', '1982', '1983', '1984', '1985', '1986', '1987', '1988', '1989',
                        '1990', '1991', '1992', '1993', '1994', '1995', '1996', '1997', '1998', '1999',
                        '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009',
                        '2010', '2011', '2012', '2013', '2014','2015','2016','2017','2018']).show()

+-------+--------+-----+
|Country|variable|value|
+-------+--------+-----+
|  Aruba|    1960| null|
|  Aruba|    1961| null|
|  Aruba|    1962| null|
|  Aruba|    1963| null|
|  Aruba|    1964| null|
|  Aruba|    1965| null|
|  Aruba|    1966| null|
|  Aruba|    1967| null|
|  Aruba|    1968| null|
|  Aruba|    1969| null|
|  Aruba|    1970| null|
|  Aruba|    1971| null|
|  Aruba|    1972| null|
|  Aruba|    1973| null|
|  Aruba|    1974| null|
|  Aruba|    1975| null|
|  Aruba|    1976| null|
|  Aruba|    1977| null|
|  Aruba|    1978| null|
|  Aruba|    1979| null|
+-------+--------+-----+
only showing top 20 rows



In [88]:
long_df2 = melt(df2, id_vars=['Country'],
           value_vars=['1960', '1961', '1962', '1963', '1964', '1965', '1966', '1967', '1968', '1969',
                        '1970', '1971', '1972', '1973', '1974', '1975', '1976', '1977', '1978', '1979',
                        '1980', '1981', '1982', '1983', '1984', '1985', '1986', '1987', '1988', '1989',
                        '1990', '1991', '1992', '1993', '1994', '1995', '1996', '1997', '1998', '1999',
                        '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009',
                        '2010', '2011', '2012', '2013', '2014','2015','2016','2017','2018'])

In [89]:
long_df2 = long_df2.withColumnRenamed('variable','Year')

In [90]:
long_df2 = long_df2.withColumnRenamed('value', 'CO2')

In [91]:
long_df2.where(df2.Country == 'Afghanistan').show()

+-----------+----+-----------+
|    Country|Year|        CO2|
+-----------+----+-----------+
|Afghanistan|1960|0.046059897|
|Afghanistan|1961|0.053604304|
|Afghanistan|1962|0.073764791|
|Afghanistan|1963|0.074232685|
|Afghanistan|1964|0.086292452|
|Afghanistan|1965|0.101467397|
|Afghanistan|1966|0.107636955|
|Afghanistan|1967|0.123734289|
|Afghanistan|1968| 0.11549774|
|Afghanistan|1969| 0.08682346|
|Afghanistan|1970|0.150290627|
|Afghanistan|1971|0.166042044|
|Afghanistan|1972| 0.13076385|
|Afghanistan|1973|0.136279785|
|Afghanistan|1974|0.155649444|
|Afghanistan|1975|0.168928649|
|Afghanistan|1976|0.154787206|
|Afghanistan|1977|0.182963616|
|Afghanistan|1978|0.163159571|
|Afghanistan|1979|0.168376671|
+-----------+----+-----------+
only showing top 20 rows



In [92]:
# Prep temperature data for merging
from pyspark.sql.functions import year
df1.show()

+----------+-------------------+-----------------------------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|Country|
+----------+-------------------+-----------------------------+-------+
|1743-11-01| 4.3839999999999995|                        2.294|  Åland|
|1743-12-01|               null|                         null|  Åland|
|1744-01-01|               null|                         null|  Åland|
|1744-02-01|               null|                         null|  Åland|
|1744-03-01|               null|                         null|  Åland|
|1744-04-01|               1.53|                         4.68|  Åland|
|1744-05-01|  6.702000000000001|                        1.789|  Åland|
|1744-06-01| 11.609000000000002|                        1.577|  Åland|
|1744-07-01|             15.342|                         1.41|  Åland|
|1744-08-01|               null|                         null|  Åland|
|1744-09-01|             11.702|                        1.517|  Åland|
|1744-

In [93]:
df1.withColumn('Year', year(df1['dt'])).show()

+----------+-------------------+-----------------------------+-------+----+
|        dt| AverageTemperature|AverageTemperatureUncertainty|Country|Year|
+----------+-------------------+-----------------------------+-------+----+
|1743-11-01| 4.3839999999999995|                        2.294|  Åland|1743|
|1743-12-01|               null|                         null|  Åland|1743|
|1744-01-01|               null|                         null|  Åland|1744|
|1744-02-01|               null|                         null|  Åland|1744|
|1744-03-01|               null|                         null|  Åland|1744|
|1744-04-01|               1.53|                         4.68|  Åland|1744|
|1744-05-01|  6.702000000000001|                        1.789|  Åland|1744|
|1744-06-01| 11.609000000000002|                        1.577|  Åland|1744|
|1744-07-01|             15.342|                         1.41|  Åland|1744|
|1744-08-01|               null|                         null|  Åland|1744|
|1744-09-01|

In [94]:
df1_new = df1.withColumn('Year', year(df1['dt']))

In [95]:
df1_new = df1_new.groupBy('Country', 'Year').agg({'AverageTemperature': 'avg'})

In [96]:
df1_new.show()

+-------+----+-----------------------+
|Country|Year|avg(AverageTemperature)|
+-------+----+-----------------------+
|  Åland|1758|      4.074833333333333|
|  Åland|1760|                  4.322|
|  Åland|1763|      4.562499999999999|
|  Åland|1766|      5.894666666666667|
|  Åland|1782|     4.2837499999999995|
|  Åland|1785|     3.8438333333333325|
|  Åland|1789|      5.828916666666667|
|  Åland|1794|      6.725083333333335|
|  Åland|1796|      5.780083333333333|
|  Åland|1803|      4.346666666666667|
|  Åland|1805|     3.9274166666666663|
|  Åland|1816|      4.298166666666666|
|  Åland|1818|      5.775333333333332|
|  Åland|1820|                 4.4975|
|  Åland|1821|      5.174416666666667|
|  Åland|1822|      7.111999999999999|
|  Åland|1823|      5.347833333333333|
|  Åland|1824|      6.099166666666668|
|  Åland|1831|      5.051083333333334|
|  Åland|1839|      4.697666666666667|
+-------+----+-----------------------+
only showing top 20 rows



In [97]:
long_df2.join(df1_new, (long_df2["Country"] == df1_new["Country"]) &
   (long_df2["Year"] == df1_new["Year"])).show()

+-----------+----+-----------+-----------+----+-----------------------+
|    Country|Year|        CO2|    Country|Year|avg(AverageTemperature)|
+-----------+----+-----------+-----------+----+-----------------------+
|Afghanistan|1971|0.166042044|Afghanistan|1971|     14.823500000000001|
|Afghanistan|1977|0.182963616|Afghanistan|1977|     14.805416666666668|
|Afghanistan|1979|0.168376671|Afghanistan|1979|     14.262083333333335|
|Afghanistan|1980|0.132858608|Afghanistan|1980|     14.887333333333332|
|Afghanistan|1984|0.234987713|Afghanistan|1984|     14.245833333333332|
|Afghanistan|1985|0.297827727|Afghanistan|1985|     14.888749999999996|
|Afghanistan|1990|0.213449805|Afghanistan|1990|     14.993333333333332|
|Afghanistan|1994| 0.08003917|Afghanistan|1994|               14.75475|
|Afghanistan|1996|0.066044698|Afghanistan|1996|     14.426000000000002|
|Afghanistan|1997|0.059648382|Afghanistan|1997|     14.904000000000002|
|Afghanistan|2002|0.048715548|Afghanistan|2002|     15.537666666

In [98]:
merged_df = long_df2.join(df, (long_df2["Country"] == df["Country"]) & (long_df2["Year"] == df["Year"]))
merged_df.show()

+-----------+----+-----------+-----------+----+------------------+-------------------+------------------+
|    Country|Year|        CO2|    Country|Year|           MaxTemp|            MinTemp|          TempDiff|
+-----------+----+-----------+-----------+----+------------------+-------------------+------------------+
|Afghanistan|1971|0.166042044|Afghanistan|1971|26.555999999999997|             -0.588|            27.144|
|Afghanistan|1977|0.182963616|Afghanistan|1977|            27.552|-2.4530000000000003|            30.005|
|Afghanistan|1979|0.168376671|Afghanistan|1979|            27.513|              1.735|25.778000000000002|
|Afghanistan|1980|0.132858608|Afghanistan|1980|27.456999999999997| 0.5329999999999999|26.923999999999996|
|Afghanistan|1984|0.234987713|Afghanistan|1984|            27.623|             -0.988|            28.611|
|Afghanistan|1985|0.297827727|Afghanistan|1985|            28.254|              1.224|             27.03|
|Afghanistan|1990|0.213449805|Afghanistan|1990

In [99]:
##Library import
from pyspark.sql.functions import *



In [100]:
result = merged_df.withColumn("CO2", col("CO2").cast("double"))

result = result.withColumn("TempDiff", col("TempDiff").cast("double"))

from pyspark.sql.functions import *

result.stat.corr('CO2','TempDiff')

0.2785130259259797

The correlation coefficient of 0.2785 between CO2 emissions and temperature change indicates a positive correlation, suggesting that as CO2 emissions increase, temperature change tends to increase as well. However, the value of 0.2785 represents a relatively weak positive correlation between these two variables.