In [19]:
df_emissions = spark.read.option("header", "true").csv("s3://spulido1st1800241/co2_emissions/co2_emission.csv")
df_temperature = spark.read.option("header", "true").csv("s3://spulido1st1800241/land-temperature-by-country/GlobalLandTemperaturesByCountry.csv")

df_emissions.show()
df_temperature.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----+----+----------------+
|     entity|code|year|annual_emissions|
+-----------+----+----+----------------+
|Afghanistan| AFG|1949|           14656|
|Afghanistan| AFG|1950|           84272|
|Afghanistan| AFG|1951|           91600|
|Afghanistan| AFG|1952|           91600|
|Afghanistan| AFG|1953|          106256|
|Afghanistan| AFG|1954|          106256|
|Afghanistan| AFG|1955|          153888|
|Afghanistan| AFG|1956|          183200|
|Afghanistan| AFG|1957|          293120|
|Afghanistan| AFG|1958|          329760|
|Afghanistan| AFG|1959|       384571.42|
|Afghanistan| AFG|1960|       413883.42|
|Afghanistan| AFG|1961|        490797.7|
|Afghanistan| AFG|1962|       688594.27|
|Afghanistan| AFG|1963|       706735.98|
|Afghanistan| AFG|1964|       838550.83|
|Afghanistan| AFG|1965|      1006916.53|
|Afghanistan| AFG|1966|      1091158.82|
|Afghanistan| AFG|1967|      1281865.11|
|Afghanistan| AFG|1968|      1223389.69|
+-----------+----+----+----------------+
only showing top

In [20]:
from pyspark.sql import functions as F

top_years_emissions = df_emissions.groupBy("year") \
                        .agg(F.sum("annual_emissions") \
                        .alias("total_emissions")) \
                        .orderBy(F.desc("total_emissions")) \
                        .limit(10)


top_years_emissions.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------------------+
|year|     total_emissions|
+----+--------------------+
|2017|   8.947275082983E10|
|2016|8.824961753229005E10|
|2015|8.768654789835999E10|
|2014|8.762096132615002E10|
|2013|8.700744654957999E10|
|2012|8.665079532765002E10|
|2011|8.524360993775002E10|
|2010|8.238109186037001E10|
|2008|7.990947993998001E10|
|2009|7.867301490757996E10|
+----+--------------------+

In [21]:
hottest_countries_all_years = df_temperature.groupBy("Country") \
    .agg(F.avg("AverageTemperature").alias("average_temperature")) \
    .orderBy(F.desc("average_temperature")).limit(10)


hottest_countries_all_years.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+
|             Country|average_temperature|
+--------------------+-------------------+
|            Djibouti| 28.816602533172542|
|                Mali| 28.441976570819996|
|        Burkina Faso| 28.083506760410998|
|             Senegal|  27.96737499999997|
|               Aruba|  27.92039022051774|
|United Arab Emirates| 27.693994700582884|
|          Mauritania| 27.620255504352293|
|              Gambia| 27.538551816239316|
|               Niger| 27.458972540045757|
|             Curaçao| 27.353414669223397|
+--------------------+-------------------+

In [22]:
hottest_years = df_temperature.withColumn("year", F.year(F.to_date("dt", "yyyy-MM-dd"))) \
    .groupBy("year") \
    .agg(F.avg("AverageTemperature").alias("average_temperature")) \
    .orderBy(F.desc("average_temperature")) \
    .limit(10)


hottest_years.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------------------+
|year|average_temperature|
+----+-------------------+
|2013| 19.877007153806858|
|2010| 19.629806129476588|
|2007|  19.61039428374656|
|1998| 19.596783057851248|
|2002| 19.565523415977967|
|2009|  19.54638533057848|
|2006|   19.5391349862259|
|2003|   19.5250867768595|
|2005| 19.499590220385706|
|2012|   19.4687441460055|
+----+-------------------+

In [23]:
countries_with_highest_avg_emissions = df_emissions.groupBy("entity") \
    .agg(F.avg("annual_emissions").alias("average_annual_emissions")) \
    .orderBy(F.desc("average_annual_emissions")).limit(10)


countries_with_highest_avg_emissions.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------------+
|              entity|average_annual_emissions|
+--------------------+------------------------+
|               World|     5.899791376895131E9|
|              Russia|    1.7049000668898306E9|
|       United States|    1.4957990314644194E9|
|               EU-28|     1.321566501059925E9|
|               China|     7.879389530948819E8|
|Asia and Pacific ...|     6.668811690486891E8|
|      Europe (other)|     5.900381650936329E8|
|             Ukraine|    4.3722688639152545E8|
|               Japan|    4.1536405464533335E8|
|             Germany|    4.0073287623141575E8|
+--------------------+------------------------+

In [26]:
df_emissions = df_emissions.withColumn("year", df_emissions["year"].cast("integer"))
df_temperature = df_temperature.withColumn("dt", F.to_date(df_temperature["dt"], 'yyyy-MM-dd'))

joined_df = df_emissions.join(df_temperature, (df_emissions.entity == df_temperature.Country) & 
                              (df_emissions.year == F.year(df_temperature.dt)))

result_df = joined_df.groupBy(df_emissions.entity.alias("country"))\
                     .agg(F.avg("annual_emissions").alias("average_annual_emissions"),
                          F.avg("averagetemperature").alias("average_temperature"))\
                     .orderBy(F.desc("average_annual_emissions"))\
                     .limit(10)

result_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+------------------------+-------------------+
|       country|average_annual_emissions|average_temperature|
+--------------+------------------------+-------------------+
|        Russia|    1.7072645889497716E9|-4.7807027439024425|
| United States|    1.5359203697698061E9|  8.603572522159553|
|         China|     8.803365430384828E8|   6.63357261613691|
|       Ukraine|     4.528754742027375E8|  8.359509146341464|
|       Germany|     3.930806270675309E8|   8.18554135338344|
|         Japan|     3.916217583245287E8| 12.044405606407325|
|United Kingdom|     2.866108890654621E8|  8.491605346912824|
|        Africa|    2.2951465119022956E8|  24.07420274551212|
|    Kazakhstan|     2.036414017164386E8| 6.0926646341463435|
|   South Korea|     1.927339258113457E8| 12.320580097087376|
+--------------+------------------------+-------------------+

In [27]:
df_emissions = df_emissions.withColumn("year", df_emissions["year"].cast("integer"))
df_temperature = df_temperature.withColumn("dt", F.to_date(df_temperature["dt"], 'yyyy-MM-dd'))

joined_df = df_emissions.join(df_temperature, (df_emissions.entity == df_temperature.Country) & 
                              (df_emissions.year == F.year(df_temperature.dt)))

result_df = joined_df.groupBy(df_emissions.entity.alias("country"))\
                     .agg(F.avg("annual_emissions").alias("average_annual_emissions"),
                          F.avg("averagetemperature").alias("average_temperature"))\
                     .orderBy(F.asc("average_annual_emissions"))\
                     .limit(10)

result_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------------+-------------------+
|             country|average_annual_emissions|average_temperature|
+--------------------+------------------------+-------------------+
|                Niue|       5066.788571428571|  25.53457824427485|
|          Montserrat|      25665.700483091787| 26.893069354838705|
|            Kiribati|      30822.748815165876|  27.11393987341773|
|            Dominica|       52960.12749003984| 26.586035904255326|
|               Tonga|       53465.66274509804| 23.577045811518325|
|             Comoros|       63643.17808219178| 26.080251524390267|
|British Virgin Is...|       70023.11111111111|  26.83983091787443|
|               Samoa|        86757.7725490196|  26.64870026178011|
|    Christmas Island|       95002.28571428571| 26.078083333333346|
|             Grenada|       97433.66274509804| 27.213242146596894|
+--------------------+------------------------+-------------------+

In [28]:
df_emissions = df_emissions.withColumn("year", df_emissions["year"].cast("integer"))
df_temperature = df_temperature.withColumn("dt", F.to_date(df_temperature["dt"], 'yyyy-MM-dd'))

joined_df = df_emissions.join(df_temperature, (df_emissions.entity == df_temperature.Country) & 
                              (df_emissions.year == F.year(df_temperature.dt)))

result_df = joined_df.groupBy(df_emissions.entity.alias("country"))\
                     .agg(F.avg("annual_emissions").alias("average_annual_emissions"),
                          F.avg("averagetemperature").alias("average_temperature"))\
                     .orderBy(F.desc("average_temperature"))\
                     .limit(10)

result_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------------+-------------------+
|             country|average_annual_emissions|average_temperature|
+--------------------+------------------------+-------------------+
|            Djibouti|       250946.1462352942|  29.08503010471201|
|                Mali|       489513.2796347031| 28.887868902439017|
|        Burkina Faso|       674113.4111210774|  28.50503892215568|
|             Senegal|       3060232.469315066| 28.429446646341475|
|               Aruba|      1285447.9788127847| 28.397669207317065|
|United Arab Emirates|    5.8651823794703156E7| 28.211400914634176|
|          Mauritania|       969136.3652968036| 28.097750000000016|
|              Gambia|      152451.13725490196| 27.906327225130873|
|               Niger|        622252.244080717|  27.83997455089821|
|               Palau|       166984.8510638298|  27.56056107954545|
+--------------------+------------------------+-------------------+

In [29]:
df_emissions = df_emissions.withColumn("year", df_emissions["year"].cast("integer"))
df_temperature = df_temperature.withColumn("dt", F.to_date(df_temperature["dt"], 'yyyy-MM-dd'))

joined_df = df_emissions.join(df_temperature, (df_emissions.entity == df_temperature.Country) & 
                              (df_emissions.year == F.year(df_temperature.dt)))

result_df = joined_df.groupBy(df_emissions.entity.alias("country"))\
                     .agg(F.avg("annual_emissions").alias("average_annual_emissions"),
                          F.avg("averagetemperature").alias("average_temperature"))\
                     .orderBy(F.asc("average_temperature"))\
                     .limit(10)

result_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------------+-------------------+
|   country|average_annual_emissions|average_temperature|
+----------+------------------------+-------------------+
| Greenland|       438429.9294117647|-18.089484293193717|
|   Denmark|     2.246963978358721E7|-18.000785644531284|
|    Canada|    1.2899122241300549E8|-5.2000344112769445|
|    Russia|    1.7072645889497716E9|-4.7807027439024425|
|  Mongolia|       6697050.589333336|-0.1202643979057595|
|    Norway|    1.3186810988514472E7|0.18958486238532107|
|   Finland|    1.8644064125252075E7| 1.4474734273318848|
|   Iceland|      1694371.3012709017| 1.8720033482142833|
|    Sweden|     2.661082450093876E7| 2.5144833965844433|
|Kyrgyzstan|       6422252.760574727| 4.3928884615384645|
+----------+------------------------+-------------------+