In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.3.1'
os.environ['SPARK_VERSION'] = spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Waiting for headers] [W                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Waiting for headers] [W                                                                               Hit:3 http://security.ubuntu.com/ubuntu focal-security InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Waiting for headers] [C                                                                               Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  Release
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.17.jar

--2023-02-02 03:37:47--  https://jdbc.postgresql.org/download/postgresql-42.2.17.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1004734 (981K) [application/java-archive]
Saving to: ‘postgresql-42.2.17.jar.2’


2023-02-02 03:37:47 (4.91 MB/s) - ‘postgresql-42.2.17.jar.2’ saved [1004734/1004734]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.17.jar").getOrCreate()

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/emissions-by-country-bucket/GCB2022v27_MtCO2_flat.csv"
spark.sparkContext.addFile(url)
emissions_df = spark.read.csv(SparkFiles.get("GCB2022v27_MtCO2_flat.csv"), sep=",", header=True, inferSchema=True)
emissions_df.show()

+-----------+------------------+----+-----+----+----+----+------+-------+-----+----------+
|    Country|ISO 3166-1 alpha-3|Year|Total|Coal| Oil| Gas|Cement|Flaring|Other|Per Capita|
+-----------+------------------+----+-----+----+----+----+------+-------+-----+----------+
|Afghanistan|               AFG|1750|  0.0|null|null|null|  null|   null| null|      null|
|Afghanistan|               AFG|1751|  0.0|null|null|null|  null|   null| null|      null|
|Afghanistan|               AFG|1752|  0.0|null|null|null|  null|   null| null|      null|
|Afghanistan|               AFG|1753|  0.0|null|null|null|  null|   null| null|      null|
|Afghanistan|               AFG|1754|  0.0|null|null|null|  null|   null| null|      null|
|Afghanistan|               AFG|1755|  0.0|null|null|null|  null|   null| null|      null|
|Afghanistan|               AFG|1756|  0.0|null|null|null|  null|   null| null|      null|
|Afghanistan|               AFG|1757|  0.0|null|null|null|  null|   null| null|      null|

In [5]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://emissions-by-country-bucket.s3.amazonaws.com/emission%25type.csv"
spark.sparkContext.addFile(url)
percentages_df = spark.read.csv(SparkFiles.get("emission%type.csv"), sep=",", header=True, inferSchema=True)
percentages_df.show()

+-------+----+---------------+--------------+--------------+-----------------+------------------+----------------+
|Country|Year|Percentage_Coal|Percentage_Oil|Percentage_Gas|Percentage_Cement|Percentage_Flaring|Percentage_Other|
+-------+----+---------------+--------------+--------------+-----------------+------------------+----------------+
| Global|2021|    40.35033527|   31.88559108|   21.33892201|      4.505438838|       1.121989123|     0.797723682|
| Global|2019|    39.71133178|   33.29234475|   20.62297865|      4.361907136|       1.184529879|     0.826907801|
| Global|2018|    40.04406622|   33.30757494|   20.44681258|      4.261111186|       1.119073689|     0.821361389|
| Global|2017|    40.18915308|   33.91616024|   19.79383255|      4.177449862|       1.085948991|     0.837455275|
| Global|2014|    42.30622741|   32.82628271|    18.7043746|      4.215897146|        1.09030013|     0.856917999|
| Global|2015|    41.36861869|   33.54934462|   19.08984157|      4.062131973|  

In [32]:
from pyspark.sql.functions import col

# Rename column
emissions_df2 = emissions_df.withColumnRenamed("ISO 3166-1 alpha-3","alpha3") \
              .withColumnRenamed("Year","year_") \
              .withColumnRenamed("Total","total") \
              .withColumnRenamed("Coal","coal") \
              .withColumnRenamed("Oil","oil") \
              .withColumnRenamed("Gas","gas") \
              .withColumnRenamed("Cement","cement") \
              .withColumnRenamed("Flaring","flaring") \
              .withColumnRenamed("Other","other") \
              .withColumnRenamed("Per Capita","per_capita")
emissions_df2.printSchema()

root
 |-- Country: string (nullable = true)
 |-- alpha3: string (nullable = true)
 |-- year_: integer (nullable = true)
 |-- total: double (nullable = true)
 |-- coal: double (nullable = true)
 |-- oil: double (nullable = true)
 |-- gas: double (nullable = true)
 |-- cement: double (nullable = true)
 |-- flaring: double (nullable = true)
 |-- other: double (nullable = true)
 |-- per_capita: double (nullable = true)



In [41]:
# Rename columns
percentages_df2 = percentages_df.withColumnRenamed("Year","year_") \
              .withColumnRenamed("Percentage_Coal","percentage_coal") \
              .withColumnRenamed("Coal","coal") \
              .withColumnRenamed("Percentage_Oil","percentage_oil") \
              .withColumnRenamed("Percentage_Gas","percentage_gas") \
              .withColumnRenamed("Percentage_Cement","percentage_cement") \
              .withColumnRenamed("Percentage_Flaring","percentage_flaring") \
              .withColumnRenamed("Percentage_Other","percentage_other")
percentages_df2.printSchema()

root
 |-- Country: string (nullable = true)
 |-- year_: integer (nullable = true)
 |-- percentage_coal: double (nullable = true)
 |-- percentage_oil: double (nullable = true)
 |-- percentage_gas: double (nullable = true)
 |-- percentage_cement: double (nullable = true)
 |-- percentage_flaring: double (nullable = true)
 |-- percentage_other: double (nullable = true)



In [49]:
# Join tables 
joined_df = emissions_df2.join(percentages_df2,["Country","year_"]) 
joined_df.show()


+-----------+-----+------+-----+----+----+----+------+-------+-----+----------+---------------+--------------+--------------+-----------------+------------------+----------------+
|    Country|year_|alpha3|total|coal| oil| gas|cement|flaring|other|per_capita|percentage_coal|percentage_oil|percentage_gas|percentage_cement|percentage_flaring|percentage_other|
+-----------+-----+------+-----+----+----+----+------+-------+-----+----------+---------------+--------------+--------------+-----------------+------------------+----------------+
|Afghanistan| 1750|   AFG|  0.0|null|null|null|  null|   null| null|      null|           null|          null|          null|             null|              null|            null|
|Afghanistan| 1751|   AFG|  0.0|null|null|null|  null|   null| null|      null|           null|          null|          null|             null|              null|            null|
|Afghanistan| 1752|   AFG|  0.0|null|null|null|  null|   null| null|      null|           null|     

In [30]:
# Store environmental variable
from getpass import getpass
password = getpass('Enter database password')
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://emissions-by-country.cqrhpsosfvfb.us-east-1.rds.amazonaws.com:5432/postgres"
config = {"user":"postgres",
          "password": password,
          "driver":"org.postgresql.Driver"}

Enter database password··········


In [34]:
# Write DataFrame to emissions table in RDS
emissions_df2.write.jdbc(url=jdbc_url, table='emissions', mode=mode, properties=config)

In [43]:
# Write DataFrame to percentages table in RDS
percentages_df2.write.jdbc(url=jdbc_url, table='percentages', mode=mode, properties=config)

In [50]:
# Write DataFrame to joined table in RDS
joined_df.write.jdbc(url=jdbc_url, table='joined', mode=mode, properties=config)