In [1]:
import pyspark

In [2]:
pyspark.__version__

'3.3.2'

### Question 1: Spark version -> '3.3.2'

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder \
        .master('local[*]') \
        .appName('test') \
        .getOrCreate()
        

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/03/07 00:05:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Getting Yellow 2024-10 data from the official website:

In [5]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-07 00:05:26--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.155.128.222, 18.155.128.46, 18.155.128.187, ...
connected. to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.155.128.222|:443... 
200 OKequest sent, awaiting response... 
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-03-07 00:05:27 (83.5 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [74]:
df = spark.read.parquet('yellow_tripdata_2024-10.parquet')

### Repartition and write to the results folder

In [8]:
df.repartition(4).write.parquet('./results/', mode='overwrite')

                                                                                

### Question 2: Get partition's size -> 25MB

In [11]:
!ls -lh ./results

total 100M
-rw-r--r-- 1 ovorobiov ovorobiov   0 Mar  7 00:12 _SUCCESS
-rw-r--r-- 1 ovorobiov ovorobiov 25M Mar  7 00:12 part-00000-845edd6c-7a90-4954-bf44-9d901fe12b69-c000.snappy.parquet
-rw-r--r-- 1 ovorobiov ovorobiov 25M Mar  7 00:12 part-00001-845edd6c-7a90-4954-bf44-9d901fe12b69-c000.snappy.parquet
-rw-r--r-- 1 ovorobiov ovorobiov 25M Mar  7 00:12 part-00002-845edd6c-7a90-4954-bf44-9d901fe12b69-c000.snappy.parquet
-rw-r--r-- 1 ovorobiov ovorobiov 25M Mar  7 00:12 part-00003-845edd6c-7a90-4954-bf44-9d901fe12b69-c000.snappy.parquet


### Question 3: How many taxi trips were there on the 15th of October? -> 128893

In [75]:
df.createOrReplaceTempView('yellow')

In [76]:
spark.sql("""
SELECT DATE(tpep_pickup_datetime),
count(1) as trips
FROM yellow
WHERE DATE(tpep_pickup_datetime) = '2024-10-15'
GROUP BY 1
""").show()

[Stage 48:>                                                         (0 + 2) / 2]

+--------------------+------+
|tpep_pickup_datetime| trips|
+--------------------+------+
|          2024-10-15|128893|
+--------------------+------+



                                                                                

### Question 4: What is the length of the longest trip in the dataset in hours? -> 162.61777777777777

In [34]:
from pyspark.sql import functions as F

In [77]:
df = df.withColumn('trip_duration', (F.unix_timestamp(df.tpep_dropoff_datetime)-F.unix_timestamp(df.tpep_pickup_datetime))/3600)

In [82]:
df.repartition(4).write.parquet('./results/', mode='overwrite')

                                                                                

In [83]:
df = spark.read.parquet('./results/')
df.orderBy(df.trip_duration.desc()).show(1)

[Stage 57:>                                                         (0 + 2) / 2]

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|     trip_duration|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       2| 2024-10-16 13:03:49|  2024-10-23 07:40:53|              1|        32.37|         3|                 N|          48|    

                                                                                

### Question 5: Spark’s User Interface which shows the application's dashboard runs on which local port? -> 4040

### Question 6: Least frequent pickup location zone -> Governor's Island/Ellis Island/Liberty Island

In [65]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-07 01:13:12--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.244.115.167, 18.244.115.220, 18.244.115.107, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.244.115.167|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-03-07 01:13:12 (140 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [71]:
df_zones = spark.read.option('header', True).csv('taxi_zone_lookup.csv')

In [84]:
df_results = df.join(df_zones, df.PULocationID == df_zones.LocationID)

In [88]:
df_results.drop('LocationID').write.parquet('/tmp/results')

                                                                                

In [89]:
df_final = spark.read.parquet('/tmp/results/')

In [90]:
df_final.createOrReplaceTempView('yellow_f')

In [95]:
spark.sql("""
SELECT zone,
count(1) as trips
FROM yellow_f
GROUP BY 1
ORDER BY 2 ASC
""").show(1)



+--------------------+-----+
|                zone|trips|
+--------------------+-----+
|Governor's Island...|    1|
+--------------------+-----+
only showing top 1 row



                                                                                