In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-04-10 14:59:52--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.2’


2020-04-10 14:59:52 (4.67 MB/s) - ‘postgresql-42.2.9.jar.2’ saved [914037/914037]



In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CoronaETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
from pyspark import SparkFiles
# Load in employee.csv from S3 into a DataFrame
url = "https://project-pandemix.s3.us-east-2.amazonaws.com/COVID-10+Cases+v2.csv"
spark.sparkContext.addFile(url)

corona_df = spark.read.option('header', 'true').csv(SparkFiles.get("COVID-10+Cases+v2.csv"), inferSchema=True, sep=',', timestampFormat="mm/dd/yy")
corona_df.show(10)

+-------------------+-------------------+--------------------+------+---------+------+
|               Date|            Country|               State|County|Confirmed|Deaths|
+-------------------+-------------------+--------------------+------+---------+------+
|2020-01-22 00:01:00|        Afghanistan|                None|  None|        0|     0|
|2020-01-22 00:01:00|            Albania|                None|  None|        0|     0|
|2020-01-22 00:01:00|            Algeria|                None|  None|        0|     0|
|2020-01-22 00:01:00|            Andorra|                None|  None|        0|     0|
|2020-01-22 00:01:00|             Angola|                None|  None|        0|     0|
|2020-01-22 00:01:00|Antigua and Barbuda|                None|  None|        0|     0|
|2020-01-22 00:01:00|          Argentina|                None|  None|        0|     0|
|2020-01-22 00:01:00|            Armenia|                None|  None|        0|     0|
|2020-01-22 00:01:00|          Australia|Au

In [0]:
from pyspark.sql.types import DateType

In [0]:
corona_df1=corona_df.withColumn("Date_1",corona_df['Date'].cast(DateType()))

In [0]:
corona_new_df=corona_df1.select("Date_1","Country","State","County","Confirmed","Deaths")

In [8]:
corona_new_df.show()

+----------+-------------------+--------------------+------+---------+------+
|    Date_1|            Country|               State|County|Confirmed|Deaths|
+----------+-------------------+--------------------+------+---------+------+
|2020-01-22|        Afghanistan|                None|  None|        0|     0|
|2020-01-22|            Albania|                None|  None|        0|     0|
|2020-01-22|            Algeria|                None|  None|        0|     0|
|2020-01-22|            Andorra|                None|  None|        0|     0|
|2020-01-22|             Angola|                None|  None|        0|     0|
|2020-01-22|Antigua and Barbuda|                None|  None|        0|     0|
|2020-01-22|          Argentina|                None|  None|        0|     0|
|2020-01-22|            Armenia|                None|  None|        0|     0|
|2020-01-22|          Australia|Australian Capita...|  None|        0|     0|
|2020-01-22|          Australia|     New South Wales|  None|    

## Rename columns

In [0]:
def renameCols(df, old_columns, new_columns):
    for old_col,new_col in zip(old_columns,new_columns):
        df = df.withColumnRenamed(old_col,new_col)
    return df

In [0]:
old_columns = ['Date_1','Country',"State","County","Confirmed","Deaths"]
new_columns = ["date", "country","state","county","confirmed","deaths"]


In [0]:
corona_conv_df = renameCols(corona_new_df, old_columns, new_columns)

In [12]:
corona_conv_df.show()

+----------+-------------------+--------------------+------+---------+------+
|      date|            country|               state|county|confirmed|deaths|
+----------+-------------------+--------------------+------+---------+------+
|2020-01-22|        Afghanistan|                None|  None|        0|     0|
|2020-01-22|            Albania|                None|  None|        0|     0|
|2020-01-22|            Algeria|                None|  None|        0|     0|
|2020-01-22|            Andorra|                None|  None|        0|     0|
|2020-01-22|             Angola|                None|  None|        0|     0|
|2020-01-22|Antigua and Barbuda|                None|  None|        0|     0|
|2020-01-22|          Argentina|                None|  None|        0|     0|
|2020-01-22|            Armenia|                None|  None|        0|     0|
|2020-01-22|          Australia|Australian Capita...|  None|        0|     0|
|2020-01-22|          Australia|     New South Wales|  None|    

## Write DataFrame to RDS

In [0]:
# Configure settings for RDS
mode = "overwrite"
jdbc_url="jdbc:postgresql://corona.cuo7ivhfh3jn.us-east-1.rds.amazonaws.com:5432/corona"
config = {"user":"root", 
          "password": "postgress", 
          "driver":"org.postgresql.Driver"}

In [0]:
corona_conv_df.write.jdbc(url=jdbc_url, table='corona_db', mode=mode, properties=config)