In [1]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.2'
spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.18                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.180% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.18                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [Co0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Get:4 https://developer.download.nvidia.com/comp

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2021-05-28 12:09:20--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2021-05-28 12:09:20 (5.55 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [9]:
from pyspark import SparkFiles
# Load in employee.csv from S3 into a DataFrame
url="https://<bucket name>.s3.amazonaws.com/employees.csv"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("employees.csv"), inferSchema=True, sep=',')
df.show(10)

+-----------+--------+--------------------+--------------+------+--------+--------------+----------------+--------------------+------------+
|Employee ID|   Email|            Position|Marital Status|Gender|Salaried|Vacation Hours|Sick Leave Hours|  Encrypted Password|    Modified|
+-----------+--------+--------------------+--------------+------+--------+--------------+----------------+--------------------+------------+
|  295847284|    ken0|Chief Executive O...|             S|     M|       1|            99|              69|F01251E5-96A3-448...|6/30/14 0:00|
|  245797967|  terri0|Vice President of...|             S|     F|       1|             1|              20|45E8F437-670D-440...|6/30/14 0:00|
|  509647174|roberto0| Engineering Manager|             M|     M|       1|             2|              21|9BBBFB2C-EFBB-421...|6/30/14 0:00|
|  112457891|    rob0|Senior Tool Designer|             S|     M|       0|            48|              80|59747955-87B8-443...|6/30/14 0:00|
|  695256908|

## Drop duplicates and incomplete rows

In [10]:
print(df.count())
df = df.dropna()
print(df.count())
df = df.dropDuplicates()
print(df.count())

292
291
289


## Examine the schema

In [11]:
df.printSchema()

root
 |-- Employee ID: integer (nullable = true)
 |-- Email: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Marital Status: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Salaried: integer (nullable = true)
 |-- Vacation Hours: integer (nullable = true)
 |-- Sick Leave Hours: integer (nullable = true)
 |-- Encrypted Password: string (nullable = true)
 |-- Modified: string (nullable = true)



## Rename columns

In [12]:
df1 = df.withColumnRenamed("Employee ID", "employee_id") \
        .withColumnRenamed("Email", "email") \
        .withColumnRenamed("Marital Status", "marital_status") \
        .withColumnRenamed("Gender", "gender") \
        .withColumnRenamed("Salaried", "salaried") \
        .withColumnRenamed("Encrypted Password", "password")
df1.show(5)

+-----------+---------+--------------------+--------------+------+--------+--------------+----------------+--------------------+------------+
|employee_id|    email|            Position|marital_status|gender|salaried|Vacation Hours|Sick Leave Hours|            password|    Modified|
+-----------+---------+--------------------+--------------+------+--------+--------------+----------------+--------------------+------------+
|  999440576| brandon0|Production Techni...|             M|     M|       0|            22|              31|CA5D1935-44E2-4C4...|6/30/14 0:00|
|  413787783|  mihail0|Production Techni...|             S|     M|       0|            64|              52|03A9BA14-0D5B-467...|6/30/14 0:00|
|   25011600|samantha0|Production Techni...|             M|     F|       0|             7|              23|1629EDDD-D0AE-4CB...|6/30/14 0:00|
|  974026903|  ovidiu0|Senior Tool Designer|             S|     M|       0|             7|              23|F68C7C19-FAC1-438...|6/30/14 0:00|
|  153

## Create a new DataFrame for employee info

In [13]:
employee_personal_info = df1.select(["employee_id", "email", "marital_status", "gender", "salaried"])
employee_personal_info.show(5)

+-----------+---------+--------------+------+--------+
|employee_id|    email|marital_status|gender|salaried|
+-----------+---------+--------------+------+--------+
|  999440576| brandon0|             M|     M|       0|
|  413787783|  mihail0|             S|     M|       0|
|   25011600|samantha0|             M|     F|       0|
|  974026903|  ovidiu0|             S|     M|       0|
|  153288994|  houman0|             M|     M|       0|
+-----------+---------+--------------+------+--------+
only showing top 5 rows



## Write DataFrame to RDS

In [14]:
# Configuration for RDS instance
mode="append"
jdbc_url="jdbc:postgresql://<connection string>:5432/<database-name>"
config = {"user":"root",
          "password": "<password>",
          "driver":"org.postgresql.Driver"}

In [15]:
# Write DataFrame to table

employee_personal_info.write.jdbc(url=jdbc_url, table='employee_personal_info', mode=mode, properties=config)

## Create a new DataFrame for employee passwords

In [16]:
employee_password = df1.select(["employee_id", "password"])
employee_password.show(5)

+-----------+--------------------+
|employee_id|            password|
+-----------+--------------------+
|  999440576|CA5D1935-44E2-4C4...|
|  413787783|03A9BA14-0D5B-467...|
|   25011600|1629EDDD-D0AE-4CB...|
|  974026903|F68C7C19-FAC1-438...|
|  153288994|9CE8B2E8-9944-4BA...|
+-----------+--------------------+
only showing top 5 rows



## Write DataFrame to RDS

In [17]:
# Write DataFrame to table

employee_password.write.jdbc(url=jdbc_url, table='employee_password', mode=mode, properties=config)