## Climate Change: Project Tasmania
*   On Canvas, you will find two datasets. The first dataset contains temperature data by countries. Date starts from 1750 for average land temperature and goes up to 2015. Answer the following questions:

---
1.   For which country and during what year, the highest average temperature was observed?

---


In [17]:
temperature = spark\
              .read\
              .option("inferSchema", "true")\
              .option("header", "true")\
              .csv("/content/drive/MyDrive/Colab Notebooks/GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv")
temperature.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- Country: string (nullable = true)



In [18]:
temperature.show(5)

+----------+------------------+-----------------------------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|Country|
+----------+------------------+-----------------------------+-------+
|1743-11-01|4.3839999999999995|                        2.294|  Åland|
|1743-12-01|              null|                         null|  Åland|
|1744-01-01|              null|                         null|  Åland|
|1744-02-01|              null|                         null|  Åland|
|1744-03-01|              null|                         null|  Åland|
+----------+------------------+-----------------------------+-------+
only showing top 5 rows



In [19]:
# DataFrame way
# find the highest average temperature
max_temp = temperature.agg({'AverageTemperature':'max'}).collect()[0][0]
max_temp_info = temperature.filter(temperature.AverageTemperature == max_temp).first()

print(max_temp_info)

Row(dt='2012-07-01', AverageTemperature=38.84200000000001, AverageTemperatureUncertainty=0.464, Country='Kuwait')


In [20]:
# SQL way
temperature.createOrReplaceTempView("temperature")

max_temp_sql = spark.sql("""
SELECT *
FROM temperature
WHERE AverageTemperature = (SELECT MAX(AverageTemperature)
FROM temperature)
""")

max_temp_sql.show()

+----------+------------------+-----------------------------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|Country|
+----------+------------------+-----------------------------+-------+
|2012-07-01| 38.84200000000001|                        0.464| Kuwait|
+----------+------------------+-----------------------------+-------+



---
2.   Analyze the data by country over the years, and name which are the top 10 countries with the biggest change in average temperature.

---

In [21]:
# DataFrame way
# find the top 10 countries with the biggest change in average temperature
diff_df = temperature.filter(temperature.AverageTemperature.isNotNull())\
  .groupBy('Country').agg(max('AverageTemperature') - min('AverageTemperature'))\
  .withColumnRenamed('(max(AverageTemperature) - min(AverageTemperature))', 'Diff')\
  .sort('Diff', ascending = False).limit(10)

diff_df.show()

+------------+------------------+
|     Country|              Diff|
+------------+------------------+
|  Kazakhstan|            49.163|
|    Mongolia|48.157999999999994|
|      Russia|             47.47|
|      Canada|            43.532|
|  Uzbekistan|            42.698|
|Turkmenistan|            40.579|
|     Finland|            40.332|
|     Belarus|            39.338|
|     Ukraine|            39.021|
|     Estonia|38.882999999999996|
+------------+------------------+



In [22]:
# SQL way
temperature.createOrReplaceTempView("temperature")

diff_sql = spark.sql("""
SELECT Country, MAX(AverageTemperature) - MIN(AverageTemperature) as Diff
FROM temperature
WHERE AverageTemperature IS NOT NULL
GROUP BY Country
ORDER BY Diff DESC
LIMIT 10
""")

diff_sql.show()

+------------+------------------+
|     Country|              Diff|
+------------+------------------+
|  Kazakhstan|            49.163|
|    Mongolia|48.157999999999994|
|      Russia|             47.47|
|      Canada|            43.532|
|  Uzbekistan|            42.698|
|Turkmenistan|            40.579|
|     Finland|            40.332|
|     Belarus|            39.338|
|     Ukraine|            39.021|
|     Estonia|38.882999999999996|
+------------+------------------+



*    The second dataset contains data on CO2 Emissions per capita across countries from 1960 to 2014.
---
1.   Merge the two datasets by country, and keep the data from 1960 to 2014.

---



In [23]:
CO2emissions = spark\
              .read\
              .option("inferSchema", "true")\
              .option("header", "true")\
              .csv("/content/drive/MyDrive/Colab Notebooks/CO2 emissions per capita per country.csv")
CO2emissions.printSchema()

root
 |-- Country Name: string (nullable = true)
 |-- Country Code: string (nullable = true)
 |-- 1960: double (nullable = true)
 |-- 1961: double (nullable = true)
 |-- 1962: double (nullable = true)
 |-- 1963: double (nullable = true)
 |-- 1964: double (nullable = true)
 |-- 1965: double (nullable = true)
 |-- 1966: double (nullable = true)
 |-- 1967: double (nullable = true)
 |-- 1968: double (nullable = true)
 |-- 1969: double (nullable = true)
 |-- 1970: double (nullable = true)
 |-- 1971: double (nullable = true)
 |-- 1972: double (nullable = true)
 |-- 1973: double (nullable = true)
 |-- 1974: double (nullable = true)
 |-- 1975: double (nullable = true)
 |-- 1976: double (nullable = true)
 |-- 1977: double (nullable = true)
 |-- 1978: double (nullable = true)
 |-- 1979: double (nullable = true)
 |-- 1980: double (nullable = true)
 |-- 1981: double (nullable = true)
 |-- 1982: double (nullable = true)
 |-- 1983: double (nullable = true)
 |-- 1984: double (nullable = true)
 |-- 19

In [24]:
CO2emissions.show(5)

+------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----+----+----+----+
|Country Name|Country Code|       1960|       1961|       1962|       1963|       1964|       1965|       1966|       1967|       1968|       1969|       1970|       1971|       1972|       1973|       1974|       1975|       1976|       1977|       1978|       1979|       1980|       1981| 

In [25]:
temperature.createOrReplaceTempView("temperature")

df1 = spark.sql("""
SELECT Country, Year, AVG(AverageTemperature) as AvgTemp
FROM (SELECT Country, LEFT(dt, 4) as Year, AverageTemperature
FROM temperature
WHERE AverageTemperature IS NOT NULL) AS tmp
WHERE Year BETWEEN 1960 AND 2014
GROUP BY Country, Year
ORDER BY Country, Year
""")

df1.show(10)

+-----------+----+------------------+
|    Country|Year|           AvgTemp|
+-----------+----+------------------+
|Afghanistan|1960|13.985416666666667|
|Afghanistan|1961|14.064916666666667|
|Afghanistan|1962|13.768666666666666|
|Afghanistan|1963|15.033416666666666|
|Afghanistan|1964|13.084916666666667|
|Afghanistan|1965|14.101833333333333|
|Afghanistan|1966|14.342999999999998|
|Afghanistan|1967|          13.66325|
|Afghanistan|1968|13.762333333333332|
|Afghanistan|1969|13.805083333333336|
+-----------+----+------------------+
only showing top 10 rows



In [26]:
# from Github: https://gist.github.com/korkridake/972e315e5ce094096e17c6ad1ef599fd
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """
    Convert :class:`DataFrame` from wide to long format.
    Source: https://stackoverflow.com/questions/41670103/how-to-melt-spark-dataframe
    """

    # -------------------------------------------------------------------------------
    # Create array<struct<variable: str, value: ...>>
    # -------------------------------------------------------------------------------
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # -------------------------------------------------------------------------------
    # Add to the DataFrame and explode
    # -------------------------------------------------------------------------------
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

In [27]:
df2 = melt(CO2emissions, id_vars = ['Country Name'],
           value_vars = ['1960', '1961', '1962', '1963', '1964', '1965', '1966', '1967', '1968', '1969',
                         '1970', '1971', '1972', '1973', '1974', '1975', '1976', '1977', '1978', '1979',
                         '1980', '1981', '1982', '1983', '1984', '1985', '1986', '1987', '1988', '1989',
                         '1990', '1991', '1992', '1993', '1994', '1995', '1996', '1997', '1998', '1999',
                         '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009',
                         '2010', '2011', '2012', '2013', '2014'])
df2 = df2.withColumnRenamed('variable', 'Year')
df2 = df2.withColumnRenamed('value', 'CO2 emissions')
df2 = df2.dropna()

df2.show(10)

+------------+----+-------------+
|Country Name|Year|CO2 emissions|
+------------+----+-------------+
|       Aruba|1986|  2.868319392|
|       Aruba|1987|  7.235198033|
|       Aruba|1988|  10.02617921|
|       Aruba|1989|   10.6347326|
|       Aruba|1990|  26.37450321|
|       Aruba|1991|   26.0461298|
|       Aruba|1992|   21.4425588|
|       Aruba|1993|  22.00078616|
|       Aruba|1994|  21.03624511|
|       Aruba|1995|  20.77193616|
+------------+----+-------------+
only showing top 10 rows



In [40]:
# merge two datasets
joined_df = df1.join(df2, (df1['Country'] == df2['Country Name']) & (df1['Year'] == df2['Year']), how = 'inner')
joined_df = joined_df.select('Country', 'tmp.Year', 'AvgTemp', 'CO2 emissions')
joined_df = joined_df[(joined_df['Year'] >= 1960) & (joined_df['Year'] <= 2014)]
# joined_df = joined_df.orderBy(asc('Country'), asc('Year'))

# show the merged dataset
joined_df.show()

+--------------------+----+-------------------+-------------+
|             Country|Year|            AvgTemp|CO2 emissions|
+--------------------+----+-------------------+-------------+
|           Australia|1979| 22.252333333333336|  14.12909198|
|          Azerbaijan|2007|  12.51483333333333|    3.5553401|
|              Canada|1980| -4.416916666666666|  18.02276896|
|Central African R...|2007|  26.06783333333333|  0.059175593|
|            Colombia|1978| 25.122250000000005|  1.572806879|
|             Comoros|1968|            25.4145|  0.083123654|
|          Costa Rica|2006| 26.514916666666664|  1.647633189|
|             Denmark|1987|-18.201666666666664|  11.37930503|
|              Greece|1985| 15.218000000000002|  6.100162266|
|               Haiti|2006|           27.22225|   0.22447544|
|                Iraq|1969|           22.39125|  2.360357896|
|               Italy|1963| 12.604583333333332|  3.215100133|
|               Japan|2000| 12.833833333333336|  9.622351624|
|       

---
2.   What is the correlation between CO2 emissions and temperature change?
---


In [30]:
joined_df.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- AvgTemp: double (nullable = true)
 |-- CO2 emissions: double (nullable = true)



In [38]:
# calculate the correlation
correlation = joined_df.groupBy('Country').agg(corr('AvgTemp', 'CO2 emissions').alias('Correlation'))

avg_corr = correlation.agg({"Correlation": "avg"}).collect()[0][0]

print('The correlation between temperature change and CO2 emissions is:', avg_corr)

The correlation between temperature change and CO2 emissions is: 0.21089195280794804
