# Setting up Spark Instance

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null  # Install Java
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz  # Download Spark
!tar xf spark-3.4.1-bin-hadoop3.tgz  # Unpack Spark
!pip install -q findspark  # Install findspark, a Python library for finding Spark
!pip install pyspark  # Install PySpark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=2904a45e738c25c408ec57fed3e75d5f56cac42acaa3dcf26aed273e8a876ac3
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
!pip install pyspark==3.4.1

Collecting pyspark==3.4.1
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=66f8fc95474f3c7895b0f3a9fe50ed1297bab1801f79a62b6a42f90352bad415
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.1
    Uninstalling pyspark-3.5.1:
      Successfully uninstalled pyspark-3.5.1
Successfully installed pyspark-3.4.1


In [3]:
import os
from pyspark.sql import SparkSession
# Define Java and Spark home path in Google Colab
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

In [4]:
import findspark
findspark.init()

In [5]:
# Initialize a SparkSession
spark = SparkSession.builder \
 .appName("CSV to DataFrame") \
 .getOrCreate()

In [6]:
from google.colab import files

uploaded = files.upload()

for filename in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=filename, length=len(uploaded[filename])))

Saving 2015-summary.csv to 2015-summary.csv
User uploaded file "2015-summary.csv" with length 7080 bytes


In [7]:
flightData2015 = spark.read.option("inferSchema", "true").option("header", "true").csv("2015-summary.csv")
flightData2015.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [8]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [9]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#19 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#19 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=45]
      +- FileScan csv [DEST_COUNTRY_NAME#17,ORIGIN_COUNTRY_NAME#18,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [10]:
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [11]:
flightData2015.createOrReplaceTempView("flight_data_2015")
# SQL way to group by and count
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1) as count
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

# DataFrame way to group by and count
dataFrameWay = flightData2015.groupBy("DEST_COUNTRY_NAME").count()

# Show the execution plan for both methods
sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5), ENSURE_REQUIREMENTS, [plan_id=67]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5), ENSURE_REQUIREMENTS, [plan_id=80]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/2015-su

In [12]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [13]:
maxSql = spark.sql("""
select DEST_COUNTRY_NAME, sum(count) as destination_total
from flight_data_2015
group by DEST_COUNTRY_NAME
order by sum(count) DESC
limit 5
""")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [14]:
from pyspark.sql.functions import desc

flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



# Climate Change


## First Dataset

In [15]:
from google.colab import files
import pandas as pd

# Upload the file
uploaded = files.upload()

# Save the uploaded file to the local file system
filename = list(uploaded.keys())[0]  # Get the filename from the uploaded files
with open(filename, 'wb') as f:
    f.write(uploaded[filename])

# Assuming you have a Spark session already created and named spark
# Load the data using Spark
df = spark.read.csv(filename, header=True, inferSchema=True)

# Show some data to confirm it's loaded correctly
df.show()


Saving globalTemp.csv to globalTemp.csv
+----------+-------------------+-----------------------------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|Country|
+----------+-------------------+-----------------------------+-------+
|1743-11-01| 4.3839999999999995|                        2.294|  Åland|
|1743-12-01|               null|                         null|  Åland|
|1744-01-01|               null|                         null|  Åland|
|1744-02-01|               null|                         null|  Åland|
|1744-03-01|               null|                         null|  Åland|
|1744-04-01|               1.53|                         4.68|  Åland|
|1744-05-01|  6.702000000000001|                        1.789|  Åland|
|1744-06-01| 11.609000000000002|                        1.577|  Åland|
|1744-07-01|             15.342|                         1.41|  Åland|
|1744-08-01|               null|                         null|  Åland|
|1744-09-01|             11.702|     

### For which country and during what year, the highest average temperature was observed?

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import max

# Initialize the Spark session
spark = SparkSession.builder\
        .appName("Temperature Analysis")\
        .getOrCreate()

In [17]:
max_temp_value = df.agg(max("AverageTemperature")).collect()[0][0]

# Filter the DataFrame to find the row(s) with this maximum temperature
max_temp = df.filter(df['AverageTemperature'] == max_temp_value)

# Show the result
max_temp.show()

# Optional: print in a formatted string
print(f"Highest Average Temperature Recorded:\n{max_temp.collect()}")

+----------+------------------+-----------------------------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|Country|
+----------+------------------+-----------------------------+-------+
|2012-07-01| 38.84200000000001|                        0.464| Kuwait|
+----------+------------------+-----------------------------+-------+

Highest Average Temperature Recorded:
[Row(dt=datetime.date(2012, 7, 1), AverageTemperature=38.84200000000001, AverageTemperatureUncertainty=0.464, Country='Kuwait')]


### Analyze the data by country over the years, and name which are the top 10 countries with the biggest change in average temperature.

In [18]:
from pyspark.sql.functions import col, year

# Convert 'dt' to a date type and extract the year
df = df.withColumn("Year", year(col("dt")))

# Filter out rows with null average temperatures
df = df.filter(col("AverageTemperature").isNotNull())


In [19]:
from pyspark.sql.functions import min, max

# Group by Country and Year, then aggregate
min_temps = df.groupBy("Country").agg(min("AverageTemperature").alias("min_temp"))
max_temps = df.groupBy("Country").agg(max("AverageTemperature").alias("max_temp"))


In [20]:
from pyspark.sql.functions import abs

# Join min and max temps on Country
temp_change = min_temps.join(max_temps, "Country")

# Calculate the temperature change
temp_change = temp_change.withColumn("temp_change", abs(col("max_temp") - col("min_temp")))


In [21]:
top_10_countries = temp_change.orderBy(col("temp_change").desc()).limit(10)

# Show the top 10 countries with the biggest temperature change
top_10_countries.show()


+------------+-------------------+------------------+------------------+
|     Country|           min_temp|          max_temp|       temp_change|
+------------+-------------------+------------------+------------------+
|  Kazakhstan|            -23.601|25.561999999999998|            49.163|
|    Mongolia|-27.441999999999997|20.715999999999998|48.157999999999994|
|      Russia|            -30.577|            16.893|             47.47|
|      Canada|            -28.736|            14.796|            43.532|
|  Uzbekistan|            -12.323|            30.375|            42.698|
|Turkmenistan|             -8.443|            32.136|            40.579|
|     Finland|              -21.2|            19.132|            40.332|
|     Belarus|            -16.527|            22.811|            39.338|
|     Ukraine|-14.724000000000002|            24.297|            39.021|
|     Estonia|-16.551000000000002|22.331999999999997|38.882999999999996|
+------------+-------------------+-----------------

## Second Dataset

In [22]:
from google.colab import files
import pandas as pd

# Upload the file
uploaded = files.upload()

# Save the uploaded file to the local file system
filename = list(uploaded.keys())[0]  # Get the filename from the uploaded files
with open(filename, 'wb') as f:
    f.write(uploaded[filename])

# Assuming you have a Spark session already created and named spark
# Load the data using Spark
df_co2 = spark.read.csv(filename, header=True, inferSchema=True)

# Show some data to confirm it's loaded correctly
df_co2.show()

Saving CO2 emissions.csv to CO2 emissions.csv
+--------------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----+----+----+----+
|        Country Name|Country Code|       1960|       1961|       1962|       1963|       1964|       1965|       1966|       1967|       1968|       1969|       1970|       1971|       1972|       1973|       1974|       1975|       1976

### Merge the two datasets by country, and keep the data from 1960 to 2014.


In [23]:
from pyspark.sql.functions import col, year

# Assuming `dt` is a string that represents dates, first convert `dt` to date type if not already done
df_temperature = df.withColumn("dt", col("dt").cast("date"))

# Extract year from the `dt` column if it hasn't been done already
df_temperature = df_temperature.withColumn("Year", year(col("dt")))

# Confirm the columns
df_temperature.show(5)

+----------+------------------+-----------------------------+-------+----+
|        dt|AverageTemperature|AverageTemperatureUncertainty|Country|Year|
+----------+------------------+-----------------------------+-------+----+
|1743-11-01|4.3839999999999995|                        2.294|  Åland|1743|
|1744-04-01|              1.53|                         4.68|  Åland|1744|
|1744-05-01| 6.702000000000001|                        1.789|  Åland|1744|
|1744-06-01|11.609000000000002|                        1.577|  Åland|1744|
|1744-07-01|            15.342|                         1.41|  Åland|1744|
+----------+------------------+-----------------------------+-------+----+
only showing top 5 rows



In [24]:
from pyspark.sql.functions import lit

# Create the melted DataFrame for CO2 emissions
years = [str(year) for year in range(1960, 2015)]  # List of years as strings
df_co2_long = None

for year in years:
    temp_df = df_co2.select(col("Country Name").alias("Country"),
                            col(year).alias("CO2_emissions"),
                            lit(int(year)).alias("Year").cast('integer'))  # Ensure Year is an integer
    df_co2_long = temp_df if df_co2_long is None else df_co2_long.union(temp_df)

df_co2_long.show(5)


+-----------+-------------+----+
|    Country|CO2_emissions|Year|
+-----------+-------------+----+
|      Aruba|         null|1960|
|Afghanistan|  0.046059897|1960|
|     Angola|  0.097471604|1960|
|    Albania|  1.258194928|1960|
|    Andorra|         null|1960|
+-----------+-------------+----+
only showing top 5 rows



In [25]:
df_merged = df_temperature.join(df_co2_long, ["Country", "Year"], "inner")
df_merged.show(5)

+-----------+----+----------+------------------+-----------------------------+-------------+
|    Country|Year|        dt|AverageTemperature|AverageTemperatureUncertainty|CO2_emissions|
+-----------+----+----------+------------------+-----------------------------+-------------+
|Afghanistan|1960|1960-01-01|             2.262|           0.5720000000000001|  0.046059897|
|Afghanistan|1960|1960-02-01| 7.007999999999999|                         0.82|  0.046059897|
|Afghanistan|1960|1960-03-01| 5.832000000000002|                        0.583|  0.046059897|
|Afghanistan|1960|1960-04-01|12.312000000000001|                        0.341|  0.046059897|
|Afghanistan|1960|1960-05-01|            18.853|                        0.504|  0.046059897|
+-----------+----+----------+------------------+-----------------------------+-------------+
only showing top 5 rows



In [26]:
# Filter the merged DataFrame to ensure the data range is correct
df_merged = df_merged.filter((col("Year") >= 1960) & (col("Year") <= 2014))
df_cleaned = df_merged.na.drop()
# Show the resulting DataFrame to confirm the range
df_merged.show()


+-----------+----+----------+------------------+-----------------------------+-------------+
|    Country|Year|        dt|AverageTemperature|AverageTemperatureUncertainty|CO2_emissions|
+-----------+----+----------+------------------+-----------------------------+-------------+
|Afghanistan|1960|1960-01-01|             2.262|           0.5720000000000001|  0.046059897|
|Afghanistan|1960|1960-02-01| 7.007999999999999|                         0.82|  0.046059897|
|Afghanistan|1960|1960-03-01| 5.832000000000002|                        0.583|  0.046059897|
|Afghanistan|1960|1960-04-01|12.312000000000001|                        0.341|  0.046059897|
|Afghanistan|1960|1960-05-01|            18.853|                        0.504|  0.046059897|
|Afghanistan|1960|1960-06-01|            25.436|                         0.39|  0.046059897|
|Afghanistan|1960|1960-07-01|              26.6|                        0.287|  0.046059897|
|Afghanistan|1960|1960-08-01|            25.749|                      

### Correlation between CO2 emissions and temperature change?


In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, min, avg

# Assuming Spark session is already started
spark = SparkSession.builder.appName("Annual Temp and CO2 Correlation").getOrCreate()

# Calculate the annual temperature range (max temp - min temp) for each country and year
temp_range = df_merged.groupBy("Country", "Year").agg(
    (max("AverageTemperature") - min("AverageTemperature")).alias("TempRange")
)

# Calculate the annual average CO2 emissions for each country and year
co2_avg = df_merged.groupBy("Country", "Year").agg(
    avg("CO2_emissions").alias("AvgCO2Emissions")
)

# Join the temperature range and CO2 dataframes on Country and Year
df_final = temp_range.join(co2_avg, ["Country", "Year"])

# Calculate the Pearson correlation coefficient between annual temperature range and CO2 emissions
correlation = df_final.stat.corr("TempRange", "AvgCO2Emissions")
print(f"Correlation between Annual Temperature Range and CO2 Emissions: {correlation}")

Correlation between Annual Temperature Range and CO2 Emissions: 0.27851302592597854
