## Part 1: Setting up your Spark instance

### Install Spark and Set up Environment

In [None]:
# Run below commands
!apt-get install openjdk-8-jdk-headless -qq > /dev/null #Install java
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz ## Install Apache Spark
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark==3.4.0

Collecting pyspark==3.4.0
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317122 sha256=28cb3e3f96e4609fa4b1e84301533810294340d2147079c7e236cefae12cbc6a
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
import os
from pyspark.sql import SparkSession
# Define Java and Spark home path in Google Colab
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

# from pyspark import SparkConf, SparkContext
from datetime import datetime, date, timedelta
from dateutil import relativedelta
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql import functions as F
from pyspark.sql.functions import collect_list, collect_set, concat, first, array_distinct, col, size, expr
import random

In [None]:
# Initialize a SparkSession
spark = SparkSession.builder \
    .appName("End-to-End Example") \
    .getOrCreate()

### End-to-End Example

In [None]:
# Load the data from Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Read the data
flightData2015 = spark.read.option('inferSchema','true').option('header','true').csv('/content/drive/My Drive/flight-data/2015-summary.csv')

Mounted at /content/drive


In [None]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [None]:
flightData2015.sort('count').explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#19 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#19 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=33]
      +- FileScan csv [DEST_COUNTRY_NAME#17,ORIGIN_COUNTRY_NAME#18,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/My Drive/flight-data/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [None]:
spark.conf.set("spar.sql.shuffle.partitions","5")
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [None]:
flightData2015.createOrReplaceGlobalTempView("flight_data_2015")

In [None]:
sqlWay = spark.sql("""
select DEST_COUNTRY_NAME, count(1)
from global_temp.flight_data_2015
group by DEST_COUNTRY_NAME""")
dataFrameWay = flightData2015.groupBy("DEST_COUNTRY_NAME").count()
sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 200), ENSURE_REQUIREMENTS, [plan_id=55]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/My Drive/flight-data/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 200), ENSURE_REQUIREMENTS, [plan_id=68]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex

In [None]:
spark.sql("select max(count) from global_temp.flight_data_2015").take(1)

[Row(max(count)=370002)]

In [None]:
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [None]:
maxSql = spark.sql("""
    select DEST_COUNTRY_NAME, sum(count) as destination_total
    from global_temp.flight_data_2015
    group by DEST_COUNTRY_NAME
    order by sum(count) DESC
    limit 5
""")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [None]:
from pyspark.sql.functions import desc
flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)","destination_total").sort(desc("destination_total")).limit(5).show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



## Part 2: Climate Change

In [None]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("Climate Analysis") \
    .getOrCreate()

### The first dataset: temperature

In [None]:
# Assuming your file is accessible at a given path
file_path = "/content/drive/My Drive/UC Davis - Spring/GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv"

# Load the data into a DataFrame
temperature = spark.read.option('inferSchema','true').option('header','true').csv(file_path)

In [None]:
temperature.take(5)

[Row(dt=datetime.date(1743, 11, 1), AverageTemperature=4.3839999999999995, AverageTemperatureUncertainty=2.294, Country='Åland'),
 Row(dt=datetime.date(1743, 12, 1), AverageTemperature=None, AverageTemperatureUncertainty=None, Country='Åland'),
 Row(dt=datetime.date(1744, 1, 1), AverageTemperature=None, AverageTemperatureUncertainty=None, Country='Åland'),
 Row(dt=datetime.date(1744, 2, 1), AverageTemperature=None, AverageTemperatureUncertainty=None, Country='Åland'),
 Row(dt=datetime.date(1744, 3, 1), AverageTemperature=None, AverageTemperatureUncertainty=None, Country='Åland')]

In [None]:
temperature.createOrReplaceGlobalTempView("temperature")

In [None]:
maxSql = spark.sql("""
    select Country, dt, year(dt) as year, AverageTemperature
    from global_temp.temperature
    order by AverageTemperature desc
    limit 1
""")
maxSql.show()

+-------+----------+----+------------------+
|Country|        dt|year|AverageTemperature|
+-------+----------+----+------------------+
| Kuwait|2012-07-01|2012| 38.84200000000001|
+-------+----------+----+------------------+



The highest average temperature was observed for Kuwait during 2012.

In [None]:
diffSql = spark.sql("""
    select country, (max(AverageTemperature)-min(AverageTemperature)) as temp_diff
    from global_temp.temperature
    group by country
    order by temp_diff desc
    limit 10
""")
diffSql.show()

+------------+------------------+
|     country|         temp_diff|
+------------+------------------+
|  Kazakhstan|            49.163|
|    Mongolia|48.157999999999994|
|      Russia|             47.47|
|      Canada|            43.532|
|  Uzbekistan|            42.698|
|Turkmenistan|            40.579|
|     Finland|            40.332|
|     Belarus|            39.338|
|     Ukraine|            39.021|
|     Estonia|38.882999999999996|
+------------+------------------+



### The second dataset: CO2

In [None]:
# Read the second dataset
file_path = "/content/drive/My Drive/UC Davis - Spring/CO2 emissions per capita per country.csv"

# Load the data into a DataFrame
co2 = spark.read.option('inferSchema','true').option('header','true').csv(file_path)

In [None]:
from pyspark.sql.functions import expr

# Transform the structure of co2
years = [str(y) for y in range(1960, 2015)]
cols = ", ".join(["'{}', {}".format(year, "`{}`".format(year)) for year in years])
stack_expr = "stack({}, {}) as (year, CO2_Per_Capita)".format(len(years), cols)
co2_df_transformed = co2.select("`Country Name`", expr(stack_expr)).where("CO2_Per_Capita is not null")
co2_df_transformed.createOrReplaceTempView("co2_transformed")

In [None]:
from pyspark.sql.functions import avg
from pyspark.sql.functions import year

# Add year column to the temperature dataset
temp_df = temperature.withColumn("year", year("dt"))
temp_df.createOrReplaceTempView("avg_temperature")

### Merge two lists without aggregation

In [None]:
query = """
SELECT
    t.Country,
    t.year,
    t.AverageTemperature,
    c.CO2_Per_Capita
FROM avg_temperature t
JOIN co2_transformed c
ON t.Country = c.`Country Name` AND t.year = c.year
WHERE t.year BETWEEN 1960 AND 2014
"""

result_df = spark.sql(query)
result_df.show()

+-----------+----+------------------+--------------+
|    Country|year|AverageTemperature|CO2_Per_Capita|
+-----------+----+------------------+--------------+
|Afghanistan|1960|             2.262|   0.046059897|
|Afghanistan|1960| 7.007999999999999|   0.046059897|
|Afghanistan|1960| 5.832000000000002|   0.046059897|
|Afghanistan|1960|12.312000000000001|   0.046059897|
|Afghanistan|1960|            18.853|   0.046059897|
|Afghanistan|1960|            25.436|   0.046059897|
|Afghanistan|1960|              26.6|   0.046059897|
|Afghanistan|1960|            25.749|   0.046059897|
|Afghanistan|1960|            20.537|   0.046059897|
|Afghanistan|1960|             14.27|   0.046059897|
|Afghanistan|1960|             7.155|   0.046059897|
|Afghanistan|1960|             1.811|   0.046059897|
|Afghanistan|1961|             1.482|   0.053604304|
|Afghanistan|1961|              1.59|   0.053604304|
|Afghanistan|1961| 9.260000000000002|   0.053604304|
|Afghanistan|1961|            12.818|   0.0536

### Merge two lists with aggregation: Temperature change in a year

In [None]:
from pyspark.sql import functions as F

# Calculate temperature change for each year in each country
temp_range_df = temp_df.groupBy("Country", "year")\
                       .agg(F.max("AverageTemperature").alias("MaxTemperature"),
                            F.min("AverageTemperature").alias("MinTemperature"))


# Create the dataframe
temp_range_df = temp_range_df.withColumn("TemperatureRange",
                                         F.col("MaxTemperature") - F.col("MinTemperature"))
temp_range_df.createOrReplaceTempView("temperature_range")

In [None]:
temp_range_df.take(3)

[Row(Country='Albania', year=1821, MaxTemperature=20.861, MinTemperature=2.539, TemperatureRange=18.322),
 Row(Country='Albania', year=1943, MaxTemperature=23.89, MinTemperature=1.446, TemperatureRange=22.444),
 Row(Country='Albania', year=1960, MaxTemperature=22.531, MinTemperature=4.127, TemperatureRange=18.404)]

In [None]:
# Merge two datasets by country
query = """
SELECT
    t.Country,
    t.year,
    t.TemperatureRange,
    c.CO2_Per_Capita
FROM temperature_range t
JOIN co2_transformed c
ON t.Country = c.`Country Name` AND t.year = c.year
WHERE t.year BETWEEN 1960 AND 2014
order by Country, year
"""

result_df = spark.sql(query)
result_df.show()

+-----------+----+------------------+--------------+
|    Country|year|  TemperatureRange|CO2_Per_Capita|
+-----------+----+------------------+--------------+
|Afghanistan|1960|            24.789|   0.046059897|
|Afghanistan|1961|25.881000000000004|   0.053604304|
|Afghanistan|1962|            26.345|   0.073764791|
|Afghanistan|1963|            24.746|   0.074232685|
|Afghanistan|1964|30.723999999999997|   0.086292452|
|Afghanistan|1965|            24.679|   0.101467397|
|Afghanistan|1966|            23.475|   0.107636955|
|Afghanistan|1967|27.567999999999998|   0.123734289|
|Afghanistan|1968|            25.204|    0.11549774|
|Afghanistan|1969|28.424999999999997|    0.08682346|
|Afghanistan|1970|            25.525|   0.150290627|
|Afghanistan|1971|            27.144|   0.166042044|
|Afghanistan|1972|            28.685|    0.13076385|
|Afghanistan|1973|29.387999999999998|   0.136279785|
|Afghanistan|1974|27.743999999999996|   0.155649444|
|Afghanistan|1975|             26.83|   0.1689

In [None]:
# Calculate the correlation between CO2 and temperature
correlation = result_df.stat.corr("TemperatureRange", "CO2_Per_Capita")
print("The Pearson correlation coefficient between average temperature and CO2 per capita is:", correlation)

The Pearson correlation coefficient between average temperature and CO2 per capita is: 0.36907906344988894
