# Print pyspark version and module location

In [1]:
import pyspark
from pyspark.sql import SparkSession

print(f"PySpark version: {pyspark.__version__}")
print(f"Module location: {pyspark.__file__}")

PySpark version: 3.3.2
Module location: /home/popoyi/ssdbox/fact/utilhub/spark-home/spark-3.3.2-bin-hadoop3/python/pyspark/__init__.py


# Initializes and returns a SparkSession configured for local use.

In [2]:
spark = SparkSession.builder \
        .appName("FHV Trip Data Analysis") \
        .master("local[6]") \
        .getOrCreate()

24/03/04 11:00:21 WARN Utils: Your hostname, vyaakar-labs01 resolves to a loopback address: 127.0.1.1; using 192.168.0.104 instead (on interface wlp7s0)
24/03/04 11:00:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/04 11:00:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/04 11:00:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Define the filepath to the CSV file

In [3]:
file_path = "./assign01-fhv-ds/raw/fhv_tripdata_2019-10.csv.gz"

# Read the CSV file into a DataFrame using inferSchema to infer data types

In [4]:
df = spark.read.csv(file_path, header=True, inferSchema=True)

                                                                                

# Print the first 10 rows of the DataFrame

In [5]:
df.show(10)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

In [6]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [7]:
# Print the number of rows and columns
num_rows = df.count()
num_columns = len(df.columns)

print(f"Number of Rows: {num_rows}")
print(f"Number of Columns: {num_columns}")

Number of Rows: 1897493
Number of Columns: 7


                                                                                

In [8]:
# Rename columns
df_renamed = df.withColumnRenamed("dispatching_base_num", "dispatching_base_number") \
               .withColumnRenamed("pickup_datetime", "pickup_time") \
               .withColumnRenamed("dropOff_datetime", "dropoff_time") \
               .withColumnRenamed("PUlocationID", "pickup_location_id") \
               .withColumnRenamed("DOlocationID", "dropoff_location_id") \
               .withColumnRenamed("SR_Flag", "special_request_flag") \
               .withColumnRenamed("Affiliated_base_number", "affiliated_base_number")

# Show the schema of the DataFrame after renaming the columns to confirm the changes
df_renamed.printSchema()


root
 |-- dispatching_base_number: string (nullable = true)
 |-- pickup_time: timestamp (nullable = true)
 |-- dropoff_time: timestamp (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- special_request_flag: string (nullable = true)
 |-- affiliated_base_number: string (nullable = true)



In [9]:
# Repartition the DataFrame to 6 partitions
df_repartitioned = df.repartition(6)

# Save the repartitioned DataFrame to a Parquet file
parquet_path = "./assign01-fhv-ds/pq"
df_repartitioned.write.mode("overwrite").parquet(parquet_path)

                                                                                

In [10]:
# Load the DataFrame from a Parquet file
df_from_parquet = spark.read.parquet(parquet_path)

# Print the schema of the loaded DataFrame
df_from_parquet.printSchema()
df_from_parquet.show(5)

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02803|2019-10-24 14:00:26|2019-10-24 14:19:29|         242|         254|   null|                B02803|
|              B01145|2019-10-12 10:32:20|2019-10-12 10:36:19|         264|         213|   null|                B02971|
|              B01779|2019-10-04 10:30:00|2019-10-04 10:51

In [11]:
# Rename columns
df_from_parquet = df_from_parquet.withColumnRenamed("dispatching_base_num", "dispatching_base_number") \
               .withColumnRenamed("pickup_datetime", "pickup_time") \
               .withColumnRenamed("dropOff_datetime", "dropoff_time") \
               .withColumnRenamed("PUlocationID", "pickup_location_id") \
               .withColumnRenamed("DOlocationID", "dropoff_location_id") \
               .withColumnRenamed("SR_Flag", "special_request_flag") \
               .withColumnRenamed("Affiliated_base_number", "affiliated_base_number")

# Show the schema of the DataFrame after renaming the columns to confirm the changes
df_from_parquet.printSchema()
df_from_parquet.show(5)

root
 |-- dispatching_base_number: string (nullable = true)
 |-- pickup_time: timestamp (nullable = true)
 |-- dropoff_time: timestamp (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- special_request_flag: string (nullable = true)
 |-- affiliated_base_number: string (nullable = true)

+-----------------------+-------------------+-------------------+------------------+-------------------+--------------------+----------------------+
|dispatching_base_number|        pickup_time|       dropoff_time|pickup_location_id|dropoff_location_id|special_request_flag|affiliated_base_number|
+-----------------------+-------------------+-------------------+------------------+-------------------+--------------------+----------------------+
|                 B02803|2019-10-24 14:00:26|2019-10-24 14:19:29|               242|                254|                null|                B02803|
|                 B01145|2019-10-12 10:32

### Count Records

In [27]:
from pyspark.sql.functions import dayofmonth, month, col

filtered_df = df_from_parquet.filter(
    (dayofmonth(col("pickup_time")) == 15) & 
    (month(col("pickup_time")) == 10)
)

record_count = filtered_df.count()
print(f"Number of records on 15th of October: {record_count}")


Number of records on 15th of October: 62610


In [12]:
# Create a temporary view
df_from_parquet.createOrReplaceTempView("trips")

# Execute SQL Query to find the longest trip duration for each day
longest_trip_query = """
SELECT 
    YEAR(pickup_time) AS year, 
    MONTH(pickup_time) AS month, 
    DAYOFMONTH(pickup_time) AS day, 
    MAX((UNIX_TIMESTAMP(dropoff_time) - UNIX_TIMESTAMP(pickup_time)) / 3600) AS longest_trip_hours
FROM 
    trips
GROUP BY 
    YEAR(pickup_time), 
    MONTH(pickup_time), 
    DAYOFMONTH(pickup_time)
ORDER BY 
    longest_trip_hours
DESC
"""

# Display the Results
longest_trip_per_day = spark.sql(longest_trip_query)
longest_trip_per_day.show(10)


+----+-----+---+------------------+
|year|month|day|longest_trip_hours|
+----+-----+---+------------------+
|2019|   10| 11|          631152.5|
|2019|   10| 28|          631152.5|
|2019|   10| 31| 87672.44083333333|
|2019|   10|  1| 70128.02805555555|
|2019|   10| 17|            8794.0|
|2019|   10| 26| 8784.166666666666|
|2019|   10| 30|1464.5344444444445|
|2019|   10| 25|1056.8266666666666|
|2019|   10|  2| 769.2313888888889|
|2019|   10| 23| 745.6166666666667|
+----+-----+---+------------------+
only showing top 10 rows



In [13]:
taxi_zone_file_path = "./assign01-fhv-ds/raw/taxi_zone_lookup.csv"

taxi_zone_df = spark.read.csv(taxi_zone_file_path, header=True, inferSchema=True)


In [14]:
# Rename columns
taxi_zone_df = taxi_zone_df.withColumnRenamed("LocationID", "location_id") \
               .withColumnRenamed("Borough", "borough") \
               .withColumnRenamed("Zone", "zone") \
               .withColumnRenamed("service_zone", "service_zone")


In [15]:
taxi_zone_df.printSchema()

taxi_zone_df.show(5)

root
 |-- location_id: integer (nullable = true)
 |-- borough: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

+-----------+-------------+--------------------+------------+
|location_id|      borough|                zone|service_zone|
+-----------+-------------+--------------------+------------+
|          1|          EWR|      Newark Airport|         EWR|
|          2|       Queens|         Jamaica Bay|   Boro Zone|
|          3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|          4|    Manhattan|       Alphabet City| Yellow Zone|
|          5|Staten Island|       Arden Heights|   Boro Zone|
+-----------+-------------+--------------------+------------+
only showing top 5 rows



### Create Temporary Views

In [16]:
df_from_parquet.createOrReplaceTempView("fhv_trips")
taxi_zone_df.createOrReplaceTempView("taxi_zones")


### Join DataFrames

In [21]:
join_query = """
SELECT 
    t.zone AS pickup_zone_name,
    COUNT(*) AS trip_count
FROM 
    fhv_trips f
JOIN 
    taxi_zones t ON f.pickup_location_id = t.location_id
GROUP BY 
    pickup_zone_name
"""

# Creating a new DataFrame by executing the join query
fhv_trip_and_zone_df = spark.sql(join_query)

# Persist the joined DataFrame as a temporary view for further analysis
fhv_trip_and_zone_df.createOrReplaceTempView("fhv_trip_and_zone_df")


In [22]:
fhv_trip_and_zone_df.printSchema()

fhv_trip_and_zone_df.show(5)


root
 |-- pickup_zone_name: string (nullable = true)
 |-- trip_count: long (nullable = false)

+--------------------+----------+
|    pickup_zone_name|trip_count|
+--------------------+----------+
|           Homecrest|      1295|
|              Corona|      7175|
|    Bensonhurst West|      1880|
|         Westerleigh|      1317|
|Charleston/Totten...|      2533|
+--------------------+----------+
only showing top 5 rows



In [None]:
### Find Least Frequent Pickup Location

In [26]:
least_frequent_query = """
SELECT 
    pickup_zone_name,
    trip_count
FROM 
    fhv_trip_and_zone_df
ORDER BY 
    trip_count ASC
"""

least_frequent_pickup_zone = spark.sql(least_frequent_query)
least_frequent_pickup_zone.show(10)


+--------------------+----------+
|    pickup_zone_name|trip_count|
+--------------------+----------+
|         Jamaica Bay|         1|
|Governor's Island...|         2|
| Green-Wood Cemetery|         5|
|       Broad Channel|         8|
|     Highbridge Park|        14|
|        Battery Park|        15|
|Saint Michaels Ce...|        23|
|Breezy Point/Fort...|        25|
|Marine Park/Floyd...|        26|
|        Astoria Park|        29|
+--------------------+----------+
only showing top 10 rows



# Stop the SparkSession when finished

In [19]:
# spark.stop()