In [None]:
# OPEN FILE IN GOOGLE COLAB

import os

spark_version = 'spark-3.0.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url="https://trump-twitter-bucket.s3.us-east-2.amazonaws.com/trump_archive.csv"
spark.sparkContext.addFile(url)

trump_archive_df = spark.read.csv(SparkFiles.get("trump_archive.csv"), sep=",", header=True, inferSchema=True)

# Show DataFrame
trump_archive_df.show()

In [None]:
trump_archive_df.describe()

In [None]:
clean_trump_archive_df = trump_archive_df.select(["text", "favorites", "retweets"])

In [None]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import IntegerType, StructField, StructType, StringType


clean_trump_archive_df = clean_trump_archive_df.withColumn("favorites", clean_trump_archive_df["favorites"].cast(IntegerType()))
clean_trump_archive_df = clean_trump_archive_df.withColumn("retweets", clean_trump_archive_df.retweets.cast(IntegerType()))

clean_trump_archive_df.dtype

In [None]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://trump-twitter-db.cyjaclylpg2n.us-east-2.rds.amazonaws.com:5432/trump_archive_db"
config = {"user":"postgres", 
          "password": "", 
          "driver":"org.postgresql.Driver"}

In [None]:
clean_trump_archive_df.write.jdbc(url=jdbc_url, table='trump_tweets', mode=mode, properties=config)