<a href="https://colab.research.google.com/github/thineikhaing/BigDataSparkPG/blob/main/SparkSQLPlayGround.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Installation**
The first step involves installing pyspark. The next step is to install findspark library.

In [5]:
# install pyspark using pip
!pip install --ignore-install -q pyspark
# install findspark using pip
!pip install --ignore-install -q findspark

## Mounting Google Drive

You can use the drive module from google.colab to mount your entire Google Drive to Colab by executing the below code. This will provide us with an authentication link to connect to Google Drive. Choose the Google account whose Drive you want to mount. Allow Google Drive Stream access to your Google Account.

In [14]:
# to read in data from a text file, first upload the data file into your google drive and then mount your google drive onto colab
from google.colab import drive
# to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True)
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


Run the sample spark df

In [21]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.master("local").appName("Customer").getOrCreate()

inputFilePath = "/content/drive/My Drive/BEAD_DATA/customers.csv"
countryFilePath = "/content/drive/My Drive/BEAD_DATA/Country.json"

df = ( spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(inputFilePath) )

df1 = ( spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .json(countryFilePath) )

# df.printSchema()
# df.count()
# df.show()
# df1.show()

(df.select("CustomerId", "Fullname")
  .where("Age > 30 AND MemberCategory ='Gold' ")
  .orderBy("Fullname")
  .show(10,False))



+----------+---------------+
|CustomerId|Fullname       |
+----------+---------------+
|57062     |Amanda Barker  |
|43508     |Anthony Johnson|
|67230     |April Terrell  |
|25216     |Brandon Howe   |
|41351     |Jesse Wilson   |
|82155     |Jessica Duncan |
|84826     |Joseph Mack    |
|18918     |Maria Rodriguez|
|42696     |Marissa Bush   |
|98571     |Mark Crawford  |
+----------+---------------+
only showing top 10 rows



**Day3 Workshop4**

Exercise1:

Using Dataframes and SparkSQL and working on Rebu Case study files mentioned above, write Spark SQL
for the following:

A. Data retrieval using Spark SQL
1. Retrieve all Driver data (use Drivers.CSV)
2. Retrieve all Taxis and display the data in ascending order of Taxi License Plate number.
3. Retrieve all Limosine Taxies. You should display only the Taxi Number, Taxi Type, and Taxi
Colour.
4. Retrieve all 4 seater Premier taxis.

B. Aggregation and Statistical Queries (use BEAD_Rebu_TripData.CSV)
Determine the average distance per trip based on ALL trips in the month of January 2024.
Find the total fares collected grouped by Taxi Type Maxi Cab

C. Analytics Questions
7. Determine the Average Occupancy i.e., (Number of Passengers / Passenger Capacity) for
Standard Taxis.
8. Determine Fares Collected by Day of the Week (ie., Sun, Mon, Tue) for the month of Jan 2024.
9. Prepare a Tabulation report showing total revenue against the two dimensions Hour of the
day AND Day of the Week.
10. Compare the total number of trips per day made by all taxis in weekends vs the total number
of trips made per day during weekdays in the month of Jan 2024.

D. Multiple Entities Joining and multiple formats joined in a DataFrame
11. Determine the total fares paid by all Gold Status Passengers in the month of Jan 2024. What
percentage does this make from the total fares for all customers in month of Jan 2024.

In [94]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import avg, sum, col, month, year, to_date, dayofweek

spark = SparkSession.builder.appName("RebuCaseStudy").getOrCreate()

drivers_df = spark.read.option("header", "true").csv("/content/drive/My Drive/BEAD_DATA/BEAD_Rebu_Drivers.csv")
taxis_df = spark.read.option("header", "true") \
                      .option("inferSchema", "true")\
                      .option("multiline", "true").json("/content/drive/My Drive/BEAD_DATA/BEAD_Rebu_TaxiCabs.json")
trip_data_df = spark.read.option("header", "true").csv("/content/drive/MyDrive/BEAD_DATA/BEAD_Rebu_TripData.csv")
passenger_df = spark.read.option("header", "true").csv("/content/drive/MyDrive/BEAD_DATA/BEAD_Rebu_Passengers.csv")

# Add a column Convert Trip Start Time to date format
trip_data_df = trip_data_df.withColumn("TripDate", to_date(trip_data_df["Date"], 'd-MMM-yy'))

# Add a column indicating if the day is a weekend or weekday
trip_data_df = trip_data_df.withColumn("IsWeekend", (dayofweek(trip_data_df.TripDate) == 1) | (dayofweek(trip_data_df.TripDate) == 7))

# Filter for trips in January 2024 and calculate the average distance traveled
jan_2024_trips = trip_data_df.filter((month(trip_data_df.TripDate) == 1) & (year(trip_data_df.TripDate) == 2024))


In [76]:
# A - Data retrieval using Spark SQL
#===================================

# 1. Retrieve all Driver data
drivers_df.show()
# 2. Retrieve all Taxis and display the data in ascending order of Taxi License Plate number
taxis_df.orderBy("TaxiNumber").show()

# 3. Retrieve all Limousine Taxis (Taxi Number, Taxi Type, Taxi Colour)
taxis_df.select("TaxiNumber", "TaxiType", "TaxiColor").where("TaxiType = 'Limosine'").show()

# 4. Retrieve all 4-seater Premier taxis
taxis_df.where("TaxiType = 'Premier' AND TaxiPassengerCapacity = 4").show()

+---+-----------------+-----------+-------------+------+
|Sno|       DriverName|DriverPhone|TaxiIDDriving|Rating|
+---+-----------------+-----------+-------------+------+
|  1|Georgiana Iverson|   38587202|          209|   1.7|
|  2|   Ewell Rolstone|   88675586|          243|   4.9|
|  3|    Pedro Thacker|   94452422|          197|   2.6|
|  4|     Winn Kellard|   81521505|          456|   4.8|
|  5|   Ermin Trounson|   21644415|          372|   4.1|
|  6| Weylin Bernhardi|   89930924|          397|   4.7|
|  7|  Giuseppe Manton|   78503208|          463|   3.3|
|  8| Friedrich De'Ath|   64901517|          264|   4.4|
|  9|  Lauraine Galton|   28736147|          367|   4.7|
| 10|   Debra Willeman|   97189395|          277|   4.6|
| 11| Francene Gavriel|   88137354|           32|   4.9|
| 12|     Eyde Brosini|   34871916|          386|   4.2|
| 13|  Orelia Woolfoot|   28037658|          329|   4.1|
| 14|Christi Middleton|   97577827|          453|   4.8|
| 15|     Jamey Cecely|   48329

In [77]:
# B - Aggregation and Statistical Queries
#========================================
# 5. Determine the average distance per trip based on ALL trips in the month of January 2024
average_distance_jan = jan_2024_trips.agg(avg("Distance Travelled").alias("AverageDistance"))
average_distance_jan.show()

# 6 Filter for Maxi Cab trips and calculate the total fares collected
total_fares_maxi_cab = trip_data_df.filter(trip_data_df["Taxi Type"] == 'Maxi Cab') \
                                   .groupBy("Taxi Type") \
                                   .agg(sum("Trip Fare").alias("TotalFare"))
total_fares_maxi_cab.show()

+-----------------+
|  AverageDistance|
+-----------------+
|11.90267497177833|
+-----------------+

+---------+-----------------+
|Taxi Type|        TotalFare|
+---------+-----------------+
| Maxi Cab|124899.2199999994|
+---------+-----------------+



In [None]:
# C. Analytics
#=============
# 7 Filter for Standard Taxis and calculate the average occupancy
average_occupancy = trip_data_df.filter(trip_data_df["Taxi Type"] == 'Standard') \
                                .withColumn("Occupancy", col("Number Of Passengers") / col("Taxi Capacity")) \
                                .agg(avg("Occupancy").alias("AverageOccupancy")).show()

# 8. Filter for trips in January 2024 and calculate total fares collected by day of the week
fares_by_day_of_week = jan_2024_trips.groupBy("Day") \
                                   .agg(sum("Trip Fare").alias("TotalFare")).show()


# 9 . Filter for trips in January 2024 and calculate total revenue by hour of the day and day of the week
revenue_by_hour_day = jan_2024_trips.groupBy("Hour of Day", "Day") \
                                  .agg(sum("Trip Fare").alias("TotalRevenue")) \
                                  .orderBy("Hour of Day", "Day").show()

# 10. Filter for trips in January 2024 and count trips per day, grouping by weekends and weekdays
trips_weekends_vs_weekdays = trip_data_df.groupBy("IsWeekend") \
                                         .agg((count("*") / countDistinct("Date")).alias("AverageTripsPerDay")).show()

In [105]:
# D Multiple Entities Joining and multiple formats joined in a DataFrame
#========================================================================

# Assuming Customer data has columns: CustomerId, Fullname, Age, Gender, MemberCategory, etc.
# Rename the CustomerId column to Passenger ID to match the trip data
passenger_df = passenger_df.withColumnRenamed("PassengerID", "Passenger ID")

# joined_df = trip_data_df.join(customer_df, trip_data_df["Passenger ID"] == customer_df["Passenger ID"], "inner")

# Join trip data with customer data
joined_df = trip_data_df.join(passenger_df, "Passenger ID")
joined_df.show()

# Filter for Gold Status Passengers in January 2024 and calculate total fares
total_fares_gold = joined_df.filter((col("MemSilvererStGoldtus") == "Gold") &
                                    (month(joined_df.TripDate) == 1) &
                                    (year(joined_df.TripDate) == 2024)) \
                            .agg(sum("Trip Fare").alias("TotalFaresGold"))

# # Calculate total fares for all passengers in January 2024
total_fares_all = joined_df.filter((month(joined_df.TripDate) == 1) &
                                   (year(joined_df.TripDate) == 2024)) \
                           .agg(sum("Trip Fare").alias("TotalFaresAll"))

total_fares_gold_value = total_fares_gold.collect()[0]["TotalFaresGold"]
total_fares_all_value = total_fares_all.collect()[0]["TotalFaresAll"]

# Calculate the percentage
percentage_gold = (total_fares_gold_value / total_fares_all_value) * 100

print(f"Total fares paid by Gold Status Passengers in January 2024: {total_fares_gold_value}")
print(f"Percentage of total fares for all customers: {percentage_gold:.2f}%")


+------------+---+--------+---+-----------+--------------------+-----------------+-----------------+------------------+------------------------+-------------+-----------+---------+-------------+--------------------+---------+--------------+----------+---------+-------------+--------------------+----+------+-----------+--------------------+------------+--------+
|Passenger ID|Sno|    Date|Day|Hour of Day|Trip Start Time HHMM|  Pickup District| DropOff District|Distance Travelled|Trip Duration in Seconds|Trip End Time|Taxi Number|Taxi Type|Taxi Capacity|Number Of Passengers|Trip Fare|Passenger Name|  TripDate|IsWeekend|PassengerName|MemSilvererStGoldtus| Age|Gender|AmountSpent|             Address|ContactTitle|   Phone|
+------------+---+--------+---+-----------+--------------------+-----------------+-----------------+------------------+------------------------+-------------+-----------+---------+-------------+--------------------+---------+--------------+----------+---------+-----------