In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
# Question 1: Execute spark.version

spark = SparkSession.\
        builder.\
        appName('week_6').\
        master('local[2]').\
        getOrCreate()

spark.version

#### Answer: 3.3.2

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


26/02/21 05:18:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/02/21 05:18:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


'3.3.2'

In [3]:
# Question 2: Yellow November 2025
# Read the November 2025 Yellow into a Spark Dataframe.
# Repartition the Dataframe to 4 partitions and save it to parquet.
# What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)?
# Select the answer which most closely matches.

!curl -Lf https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet -o yellow.parquet

taxi = spark.read.parquet('yellow.parquet', header = True, inferSchema = True)
taxi = taxi.repartition(4)
taxi.write.parquet('yellow', 'overwrite')
!ls -la --block-size M yellow/

#### Answer: 28 MB

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 67.8M  100 67.8M    0     0   165M      0 --:--:-- --:--:-- --:--:--  165M




total 111M
drwxr-xr-x 2 ajay ajay  1M Feb 21 05:18 .
drwxrwxr-x 4 ajay ajay  1M Feb 21 05:18 ..
-rw-r--r-- 1 ajay ajay  1M Feb 21 05:18 ._SUCCESS.crc
-rw-r--r-- 1 ajay ajay  1M Feb 21 05:18 .part-00000-afb2ca27-1ffe-4f95-bb23-c13c820b3813-c000.snappy.parquet.crc
-rw-r--r-- 1 ajay ajay  1M Feb 21 05:18 .part-00001-afb2ca27-1ffe-4f95-bb23-c13c820b3813-c000.snappy.parquet.crc
-rw-r--r-- 1 ajay ajay  1M Feb 21 05:18 .part-00002-afb2ca27-1ffe-4f95-bb23-c13c820b3813-c000.snappy.parquet.crc
-rw-r--r-- 1 ajay ajay  1M Feb 21 05:18 .part-00003-afb2ca27-1ffe-4f95-bb23-c13c820b3813-c000.snappy.parquet.crc
-rw-r--r-- 1 ajay ajay  0M Feb 21 05:18 _SUCCESS
-rw-r--r-- 1 ajay ajay 28M Feb 21 05:18 part-00000-afb2ca27-1ffe-4f95-bb23-c13c820b3813-c000.snappy.parquet
-rw-r--r-- 1 ajay ajay 28M Feb 21 05:18 part-00001-afb2ca27-1ffe-4f95-bb23-c13c820b3813-c000.snappy.parquet
-rw-r--r-- 1 ajay ajay 28M Feb 21 05:18 part-00002-afb2ca27-1ffe-4f95-bb23-c13c820b3813-c000.snappy.parquet
-rw-r--r-- 1 ajay ajay 28

                                                                                

In [4]:
# Question 3: Count records
# How many taxi trips were there on the 15th of November?
# Consider only trips that started on the 15th of November.

taxi.filter((taxi.tpep_pickup_datetime >= '2025-11-15') & (taxi.tpep_pickup_datetime < '2025-11-16')).count()

#### Answer: 162,604

#### Alternate query:
# taxi.filter(F.date_trunc('day', 'tpep_pickup_datetime') == '2025-11-15').count()
# taxi.filter(F.to_date('tpep_pickup_datetime') == '2025-11-15').count()

                                                                                

162604

In [5]:
# Question 4: Longest trip
# What is the length of the longest trip in the dataset in hours?

taxi.withColumn('trip_duration', (F.unix_timestamp(taxi.tpep_dropoff_datetime) - F.unix_timestamp(taxi.tpep_pickup_datetime)) / 3600).\
    select('trip_duration').sort('trip_duration', ascending = False).head(1)

#### Answer: 90.65

#### Alternate query:
# taxi.withColumn('trip_duration', (F.unix_timestamp(taxi.tpep_dropoff_datetime) - F.unix_timestamp(taxi.tpep_pickup_datetime)) / 3600).\
#     agg({'trip_duration': 'max'}).show()

                                                                                

[Row(trip_duration=90.64666666666666)]

In [6]:
# Question 5: User Interface
# Sparkâ€™s User Interface which shows the application's dashboard runs on which local port?

spark._activeSession

#### Answer: 4040

In [7]:
# Question 6: Least frequent pickup location zone
# Using the zone lookup data and the Yellow November 2025 data, what is the name of the LEAST frequent pickup location Zone?

!curl -Lf https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv -o zone.csv
zone = spark.read.csv('zone.csv', header = True, inferSchema = True)
merged = taxi.join(zone, zone.LocationID == taxi.PULocationID, 'left').\
                withColumnRenamed('Zone', 'pickup_zone').drop('Borough', 'service_zone', 'LocationID')
merged.groupby('pickup_zone').count().sort('count', ascending = True).head(5)

#### Answer: Arden Heights

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 12331  100 12331    0     0  63039      0 --:--:-- --:--:-- --:--:-- 62913


                                                                                

[Row(pickup_zone='Arden Heights', count=1),
 Row(pickup_zone="Eltingville/Annadale/Prince's Bay", count=1),
 Row(pickup_zone="Governor's Island/Ellis Island/Liberty Island", count=1),
 Row(pickup_zone='Port Richmond', count=3),
 Row(pickup_zone='Green-Wood Cemetery', count=4)]

In [8]:
!rm -rf yellow yellow.parquet zone.csv