In [None]:
#Step 1 - Import required modules and dependencies.
import sys
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import * 
from pyspark.sql.types import *

In [None]:
#Step 2 - Create the Apache Spark application.
spark =  SparkSession.builder.appName("fare-data").getOrCreate()

In [None]:
#Step 3 - replace BUCKETNAME with the S3 bucket name from the previous lab step.
#Ex. 'fare-data-ingest-1234567-us-east-1'
dataBucket = 'BUCKETNAME'

In [None]:
#Step 4 - Read the fare data file from the data lake and import it into the cluster for processing.
df = spark.read.csv("s3://"+dataBucket+"/data/ingest/fare_data.csv", header=True, inferSchema=False).select('transaction_date', 'fare', 'pickup_time', 'duration_sec', 'car_number', 'pickup_location', 'dropoff_location')
df.sort(df.transaction_date, ascending=True).show(10)

In [None]:
#Step 5 - Look up the total number of records in the file.
("Total number of records: " + str(df.count()))

In [None]:
#Step 6 - Using Apache Spark, find rides where duration was longer than 5 minutes.
dfVol = df.filter( (df.duration_sec > 300)).sort(df.duration_sec, ascending=False)
dfVol.show(10)

In [None]:
#Step 7 - As an alternative to Spark, use SQL to query the data within the cluster.
#Create a view called fare_date_sql for SQL queries
df.createOrReplaceTempView("fare_data_sql")

In [None]:
#Step 8 - Run a SQL query on the created query view.
dfSql = spark.sql("SELECT *  FROM fare_data_sql WHERE duration_sec > 600 AND pickup_location = 'Data_Analytics_Island' and dropoff_location = 'Cloud_Isle' and fare > 15.00 ORDER BY fare DESC LIMIT 10")
dfSql.sort(dfSql.car_number, ascending=False).show()

In [None]:
#Step 9 - Export the data to a Glue table in order to allow others to query the data using Athena.
df.write.format("parquet").mode("overwrite").option("path", "s3://"+dataBucket+"/data/processed/").saveAsTable("default.rides_table");