## Getting Started with Spark

### Author - Manickashree "Madhu"

#### 1. Setting up your Spark instance


In [1]:
# Run below commands
!apt-get install openjdk-8-jdk-headless -qq > /dev/null #Install java
!wget -q https://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz ## Install Apache Spark
!tar xf spark-3.4.0-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark==3.4.0

Collecting pyspark==3.4.0
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317122 sha256=58a68e0cbc569fab103e441cce01fd060575a6a27a5085bda824d0e3946c55a6
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
import os
from pyspark.sql import SparkSession

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"

In [3]:
ls /content/spark-3.4.0-bin-hadoop3/bin


[0m[01;32mbeeline[0m*               load-spark-env.sh  [01;32mspark-class[0m*          sparkR.cmd        spark-sql.cmd
beeline.cmd            [01;32mpyspark[0m*           [01;32mspark-class2.cmd[0m*     [01;32mspark-shell[0m*      [01;32mspark-submit[0m*
[01;32mdocker-image-tool.sh[0m*  pyspark2.cmd       spark-class.cmd       spark-shell2.cmd  spark-submit2.cmd
[01;32mfind-spark-home[0m*       pyspark.cmd        [01;32mspark-connect-shell[0m*  spark-shell.cmd   spark-submit.cmd
find-spark-home.cmd    [01;32mrun-example[0m*       [01;32msparkR[0m*               [01;32mspark-sql[0m*
load-spark-env.cmd     run-example.cmd    sparkR2.cmd           spark-sql2.cmd


In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from datetime import datetime, date, timedelta
from dateutil import relativedelta
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql import functions as F
from pyspark.sql.functions import collect_list, collect_set, concat, first, last, array_distinct, col, size, expr
from pyspark.sql import DataFrame
from pyspark.sql.functions import isnan
from pyspark.sql.functions import year
from pyspark.sql.functions import max
from pyspark.sql.functions import desc
from pyspark.sql.functions import max, min
import random
import pandas as pd

In [5]:
from pyspark.sql import SparkSession

# Initialize a SparkSession
spark = SparkSession.builder \
    .appName("my session") \
    .getOrCreate()



In [8]:
flightData2015 = spark.read.csv("/content/2015-summary.csv", header=True, inferSchema=True)

In [9]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [10]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#19 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#19 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=33]
      +- FileScan csv [DEST_COUNTRY_NAME#17,ORIGIN_COUNTRY_NAME#18,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [11]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [12]:
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [13]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [14]:
sqlWay = spark.sql("""
  SELECT DEST_COUNTRY_NAME, count(1)
  FROM flight_data_2015
  GROUP BY DEST_COUNTRY_NAME
  """)

In [15]:
dataFrameWay = flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .count()

In [16]:
sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5), ENSURE_REQUIREMENTS, [plan_id=55]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5), ENSURE_REQUIREMENTS, [plan_id=68]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/2015-su

In [17]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)

[Row(max(count)=370002)]

In [18]:
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [19]:
maxSql = spark.sql("""
  SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
  FROM flight_data_2015
  GROUP BY DEST_COUNTRY_NAME
  ORDER BY sum(count) DESC
  LIMIT 5
  """)
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [20]:
flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [21]:
flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#113L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#17,destination_total#113L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[sum(count#19)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5), ENSURE_REQUIREMENTS, [plan_id=238]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_sum(count#19)])
            +- FileScan csv [DEST_COUNTRY_NAME#17,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




**Climate** **Change**: **Project Tasmania**


In [22]:
temp = spark.read.csv("/content/GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv", header=True, inferSchema=True)

In [23]:
temp.show(4)

+----------+------------------+-----------------------------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|Country|
+----------+------------------+-----------------------------+-------+
|1743-11-01|4.3839999999999995|                        2.294|  Åland|
|1743-12-01|              null|                         null|  Åland|
|1744-01-01|              null|                         null|  Åland|
|1744-02-01|              null|                         null|  Åland|
+----------+------------------+-----------------------------+-------+
only showing top 4 rows



In [24]:
temp.createOrReplaceTempView("temp")

a. For which country and during what year, the highest average temperature was observed?

In [25]:
high_avg_temp_1 = spark.sql("""
    SELECT *
    FROM temp
    ORDER BY AverageTemperature DESC
    LIMIT 1
""").first()

In [26]:
country = high_avg_temp_1["Country"]
print(country)

year = high_avg_temp_1['dt'].year
print(year)

Bahrain
2012


In [27]:
high_avg_temp_2 = temp.select("dt", "Country") \
    .orderBy(col("AverageTemperature").desc()) \
    .first()

In [28]:
country = high_avg_temp_2["Country"]
print(country)

year = high_avg_temp_2['dt'].year
print(year)

Bahrain
2012


**b**. Analyze the data by country over the years, and name which are the top 10 countries with the biggest change in average temperature.

In [29]:
avg_temp_change = temp.filter(temp["AverageTemperature"].isNotNull()) \
    .groupBy("Country") \
    .agg(max("AverageTemperature").alias("MaxTemp"), min("AverageTemperature").alias("MinTemp")) \
    .withColumn("TempChange", col("MaxTemp") - col("MinTemp"))

countries = avg_temp_change.select("Country", "TempChange") \
    .orderBy(col("TempChange").desc()) \
    .limit(10)

countries.show()

+--------------+------------------+
|       Country|        TempChange|
+--------------+------------------+
|        Canada|            43.532|
|       Finland|            40.332|
|       Belarus|            39.338|
|       Estonia|38.882999999999996|
|     Greenland|            37.997|
|       Denmark|37.528999999999996|
|       Armenia|            35.566|
|          Asia|33.992000000000004|
|       Hungary|            33.766|
|Czech Republic|            33.708|
+--------------+------------------+



**Second** **Dataset**

In [30]:
co2_emissions = spark.read.csv("/content/CO2 emissions per capita per country.csv", header=True, inferSchema=True)

In [31]:
co2_emissions.show(5)

+------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----+----+----+----+
|Country Name|Country Code|       1960|       1961|       1962|       1963|       1964|       1965|       1966|       1967|       1968|       1969|       1970|       1971|       1972|       1973|       1974|       1975|       1976|       1977|       1978|       1979|       1980|       1981| 

In [32]:
filtered_co2_emissions = co2_emissions.select("Country Name", "Country Code", *[col(str(year)).alias(f"`{year}`") for year in range(1960, 2015)])

df = filtered_co2_emissions.toPandas()

trans_df = df.melt(id_vars=["Country Name", "Country Code"], var_name="Year", value_name="CO2_Emissions_per_capita")

transposed_co2_emissions = spark.createDataFrame(trans_df)
transposed_co2_emissions = transposed_co2_emissions.withColumn("Year", regexp_replace("Year", "`", ""))

transposed_co2_emissions.show(5)

+------------+------------+----+------------------------+
|Country Name|Country Code|Year|CO2_Emissions_per_capita|
+------------+------------+----+------------------------+
|       Aruba|         ABW|1960|                     NaN|
| Afghanistan|         AFG|1960|             0.046059897|
|      Angola|         AGO|1960|             0.097471604|
|     Albania|         ALB|1960|             1.258194928|
|     Andorra|         AND|1960|                     NaN|
+------------+------------+----+------------------------+
only showing top 5 rows



In [34]:
from pyspark.sql.functions import year

temp_data = temp.withColumn("Year", year("dt"))

temp_filtered = temp_data.filter((year(temp_data["dt"]) >= 1960) & (year(temp_data["dt"]) <= 2014))

merged_dataset = transposed_co2_emissions.join(temp_filtered, (transposed_co2_emissions["Country Name"] == temp_filtered["Country"]) & (transposed_co2_emissions["Year"] == year(temp_filtered["dt"])), "inner").drop(temp_filtered["Country"]).drop(temp_filtered["dt"])
merged_dataset = merged_dataset.drop(transposed_co2_emissions["Year"])

In [35]:
merged_dataset.show(5)

+------------+------------+------------------------+------------------+-----------------------------+----+
|Country Name|Country Code|CO2_Emissions_per_capita|AverageTemperature|AverageTemperatureUncertainty|Year|
+------------+------------+------------------------+------------------+-----------------------------+----+
|       Aruba|         ABW|                     NaN|            26.629|                        0.255|1960|
|       Aruba|         ABW|                     NaN|            28.465|                        0.273|1960|
|       Aruba|         ABW|                     NaN|28.883000000000003|          0.34600000000000003|1960|
|       Aruba|         ABW|                     NaN|29.430999999999997|                        0.575|1960|
|       Aruba|         ABW|                     NaN|            29.267|                        0.613|1960|
+------------+------------+------------------------+------------------+-----------------------------+----+
only showing top 5 rows



In [37]:
valid_dataset = merged_dataset.filter(~isnan("CO2_Emissions_per_capita") & ~isnan("AverageTemperature"))

correlation_co2_temp = valid_dataset.select(corr("CO2_Emissions_per_capita", "AverageTemperature")).first()[0]

print("Correlation between CO2 emissions and temperature change:", correlation_co2_temp)

Correlation between CO2 emissions and temperature change: -0.41632955285479006
