In [0]:
# Import PySpark libraries
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import urllib

In [0]:
# Set up SparkSession
spark = SparkSession.builder.appName("Fetch Data from S3").getOrCreate()

In [0]:
# Define file type
file_type = 'csv'
# Whether the file has a header
first_row_is_header = 'true'
# Delimiter used in the file
delimiter = ','
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option('header', first_row_is_header)\
.option('sep', delimiter)\
.load('/FileStore/PragathiGopishetty_accessKeys.csv')

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe='')

In [0]:
# AWS S3 bucket name
AWS_S3_BUCKET = 'source-nycdata'
# Mount name for the bucket
MOUNT_NAME = '/mnt/source-nycdata'
# Source url
SOURCE_URL = 's3n://{0}:{1}@{2}'.format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# Mount the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

Out[28]: True

In [0]:
# Check if the AWS S3 bucket was mounted successfully
display(dbutils.fs.ls('/mnt/source-nycdata/'))

path,name,size,modificationTime
dbfs:/mnt/source-nycdata/data.csv,data.csv,135985,1680554980000


In [0]:
# File location and type
file_location = '/mnt/source-nycdata/data.csv'
file_type = 'csv'
# CSV options
infer_schema = 'true'
first_row_is_header = 'true'
delimiter = ','
# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
.option('inferSchema', infer_schema) \
.option('header', first_row_is_header) \
.option('sep', delimiter) \
.load(file_location)
display(df)

vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
2,2019-07-02T18:44:04.000+0000,2019-07-02T18:59:14.000+0000,1,2.95,1,N,163,246,1,13.0,1.0,0.5,4.0,0.0,0.3,21.3,2.5
2,2019-07-02T18:02:06.000+0000,2019-07-02T18:07:13.000+0000,1,1.02,1,N,239,43,1,6.0,1.0,0.5,2.06,0.0,0.3,12.36,2.5
2,2019-07-02T18:50:51.000+0000,2019-07-02T18:55:47.000+0000,1,0.85,1,N,142,239,1,5.0,1.0,0.5,2.32,0.0,0.3,11.62,2.5
2,2019-07-02T18:57:33.000+0000,2019-07-02T19:32:17.000+0000,1,10.97,1,N,138,158,1,36.5,1.0,0.5,9.38,6.12,0.3,56.3,2.5
2,2019-07-02T17:58:26.000+0000,2019-07-02T18:06:52.000+0000,1,1.07,1,N,236,236,2,7.0,1.0,0.5,0.0,0.0,0.3,11.3,2.5
2,2019-07-02T18:24:52.000+0000,2019-07-02T19:10:47.000+0000,1,10.82,1,N,237,93,1,36.0,1.0,0.5,11.6,6.12,0.3,58.02,2.5
2,2019-07-02T18:52:12.000+0000,2019-07-02T19:10:37.000+0000,2,6.74,1,N,264,264,1,22.5,1.0,0.5,2.5,0.0,0.3,26.8,0.0
2,2019-07-02T18:00:19.000+0000,2019-07-02T18:16:19.000+0000,1,1.69,1,N,143,230,2,11.5,1.0,0.5,0.0,0.0,0.3,15.8,2.5
2,2019-07-02T18:22:57.000+0000,2019-07-02T18:56:23.000+0000,1,10.31,1,N,48,138,1,34.5,1.0,0.5,5.5,0.0,0.3,44.3,2.5
2,2019-07-02T18:20:48.000+0000,2019-07-02T18:25:22.000+0000,1,0.44,1,N,48,143,2,4.5,1.0,0.5,0.0,0.0,0.3,8.8,2.5


In [0]:
df.printSchema()

root
 |-- vendorid: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- ratecodeid: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pulocationid: integer (nullable = true)
 |-- dolocationid: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [0]:
#Check whether you have null values
df.select([col(c).isNull().alias(c) for c in df.columns]).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|   false|               false|                false|          false|        false|     false|             false|       false|       false|       false|      false|false|  false|     false|       false|                false

In [0]:
#Retrieving null values
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       0|                   0|                    0|              0|            0|         0|                 0|           0|           0|           0|          0|    0|      0|         0|           0|                    0

In [0]:
df=df.filter(df.trip_distance>0)

In [0]:
display(df)

vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
2,2019-07-02T18:44:04.000+0000,2019-07-02T18:59:14.000+0000,1,2.95,1,N,163,246,1,13.0,1.0,0.5,4.0,0.0,0.3,21.3,2.5
2,2019-07-02T18:02:06.000+0000,2019-07-02T18:07:13.000+0000,1,1.02,1,N,239,43,1,6.0,1.0,0.5,2.06,0.0,0.3,12.36,2.5
2,2019-07-02T18:50:51.000+0000,2019-07-02T18:55:47.000+0000,1,0.85,1,N,142,239,1,5.0,1.0,0.5,2.32,0.0,0.3,11.62,2.5
2,2019-07-02T18:57:33.000+0000,2019-07-02T19:32:17.000+0000,1,10.97,1,N,138,158,1,36.5,1.0,0.5,9.38,6.12,0.3,56.3,2.5
2,2019-07-02T17:58:26.000+0000,2019-07-02T18:06:52.000+0000,1,1.07,1,N,236,236,2,7.0,1.0,0.5,0.0,0.0,0.3,11.3,2.5
2,2019-07-02T18:24:52.000+0000,2019-07-02T19:10:47.000+0000,1,10.82,1,N,237,93,1,36.0,1.0,0.5,11.6,6.12,0.3,58.02,2.5
2,2019-07-02T18:52:12.000+0000,2019-07-02T19:10:37.000+0000,2,6.74,1,N,264,264,1,22.5,1.0,0.5,2.5,0.0,0.3,26.8,0.0
2,2019-07-02T18:00:19.000+0000,2019-07-02T18:16:19.000+0000,1,1.69,1,N,143,230,2,11.5,1.0,0.5,0.0,0.0,0.3,15.8,2.5
2,2019-07-02T18:22:57.000+0000,2019-07-02T18:56:23.000+0000,1,10.31,1,N,48,138,1,34.5,1.0,0.5,5.5,0.0,0.3,44.3,2.5
2,2019-07-02T18:20:48.000+0000,2019-07-02T18:25:22.000+0000,1,0.44,1,N,48,143,2,4.5,1.0,0.5,0.0,0.0,0.3,8.8,2.5


In [0]:
df.columns

Out[56]: ['vendorid',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'ratecodeid',
 'store_and_fwd_flag',
 'pulocationid',
 'dolocationid',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge']

In [0]:
df=df[df.vendorid,df.tpep_pickup_datetime,df.tpep_dropoff_datetime,df.passenger_count,df.trip_distance,df.payment_type,df.fare_amount,df.tip_amount,df.total_amount]

In [0]:
display(df)

vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,payment_type,fare_amount,tip_amount,total_amount
2,2019-07-02T18:44:04.000+0000,2019-07-02T18:59:14.000+0000,1,2.95,1,13.0,4.0,21.3
2,2019-07-02T18:02:06.000+0000,2019-07-02T18:07:13.000+0000,1,1.02,1,6.0,2.06,12.36
2,2019-07-02T18:50:51.000+0000,2019-07-02T18:55:47.000+0000,1,0.85,1,5.0,2.32,11.62
2,2019-07-02T18:57:33.000+0000,2019-07-02T19:32:17.000+0000,1,10.97,1,36.5,9.38,56.3
2,2019-07-02T17:58:26.000+0000,2019-07-02T18:06:52.000+0000,1,1.07,2,7.0,0.0,11.3
2,2019-07-02T18:24:52.000+0000,2019-07-02T19:10:47.000+0000,1,10.82,1,36.0,11.6,58.02
2,2019-07-02T18:52:12.000+0000,2019-07-02T19:10:37.000+0000,2,6.74,1,22.5,2.5,26.8
2,2019-07-02T18:00:19.000+0000,2019-07-02T18:16:19.000+0000,1,1.69,2,11.5,0.0,15.8
2,2019-07-02T18:22:57.000+0000,2019-07-02T18:56:23.000+0000,1,10.31,1,34.5,5.5,44.3
2,2019-07-02T18:20:48.000+0000,2019-07-02T18:25:22.000+0000,1,0.44,2,4.5,0.0,8.8


In [0]:
#Load

driver = "org.postgresql.Driver"
url = "jdbc:postgresql://database-2.cpqa9nsyvdnq.us-east-1.rds.amazonaws.com/"
table = "g_pragathi_nyc.nyc_data"
user = "postgres"
password = "Pragathi123"

In [0]:
df.write.format("jdbc").option("driver", driver).option("url",url).option("dbtable", table).option("mode", "append").option("user",user).option("password", password).save()