In [1]:
import pyspark
from pyspark.sql import SparkSession, types
from pyspark.sql import functions as F
import glob
import os

In [2]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet  -P 'taxi_data/'

--2025-03-02 22:26:18--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
2600:9000:2365:5e00:b:20a5:b140:21, 2600:9000:2365:ba00:b:20a5:b140:21, 2600:9000:2365:7200:b:20a5:b140:21, ...
connected. to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|2600:9000:2365:5e00:b:20a5:b140:21|:443... 
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘taxi_data/yellow_tripdata_2024-10.parquet’


2025-03-02 22:26:29 (10.7 MB/s) - ‘taxi_data/yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



### Question 1: Install Spark and PySpark
* Install Spark
* Run PySpark
* Create a local spark session
* Execute spark.version.
What's the output?

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .getOrCreate()



25/03/02 22:26:30 WARN Utils: Your hostname, MauPC resolves to a loopback address: 127.0.1.1; using 192.168.1.73 instead (on interface enp5s0)
25/03/02 22:26:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/03/02 22:26:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
pyspark.__version__

'3.3.2'

### Question 2: Yellow October 2024

* Read the October 2024 Yellow into a Spark Dataframe.
* Repartition the Dataframe to 4 partitions and save it to parquet.
* What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

In [5]:
df = spark.read \
    .parquet('taxi_data/yellow_tripdata_2024-10.parquet')

In [6]:
df = df.repartition(4)

In [7]:
partitions_path = r'taxi_data/partitions/'
df.write.parquet(partitions_path)

                                                                                

In [8]:
parquet_files = glob.glob(f'{partitions_path}/*.parquet')
for file in parquet_files:
    file_size = os.path.getsize(file)/1024/1024
    print(f'{file}: {round(file_size, 2)} MB')

taxi_data/partitions/part-00000-2c81a725-9088-4623-91b4-d32118c84ae2-c000.snappy.parquet: 24.23 MB
taxi_data/partitions/part-00002-2c81a725-9088-4623-91b4-d32118c84ae2-c000.snappy.parquet: 24.25 MB
taxi_data/partitions/part-00003-2c81a725-9088-4623-91b4-d32118c84ae2-c000.snappy.parquet: 24.22 MB
taxi_data/partitions/part-00001-2c81a725-9088-4623-91b4-d32118c84ae2-c000.snappy.parquet: 24.2 MB


### Question 3: Count records

How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of October.

In [9]:
df \
    .withColumn('pickup_date', F.to_date(df.tpep_pickup_datetime))\
    .filter("pickup_date = '2024-10-15'")\
    .count()

128632

### Question 4: Longest trip

What is the length of the longest trip in the dataset in hours?

In [10]:
df \
    .withColumn('duration', (df.tpep_dropoff_datetime.cast('long') - \
                            df.tpep_pickup_datetime.cast('long'))/3600) \
    .orderBy('duration', ascending = False)\
    .select('duration')\
    .limit(5)\
    .show()



+------------------+
|          duration|
+------------------+
|162.61777777777777|
|           143.325|
|137.76055555555556|
|114.83472222222223|
| 89.89833333333333|
+------------------+



                                                                                

### Question 5: User Interface
Spark’s User Interface which shows the application's dashboard runs on which local port?

**Answer:** 4040

### Question 6: Least frequent pickup location zone

Load the zone lookup data into a temp view in Spark:

In [11]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv -P 'taxi_data/'

--2025-03-02 22:26:41--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
2600:9000:2365:1a00:b:20a5:b140:21, 2600:9000:2365:ea00:b:20a5:b140:21, 2600:9000:2365:3c00:b:20a5:b140:21, ...
connected. to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|2600:9000:2365:1a00:b:20a5:b140:21|:443... 
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_data/taxi_zone_lookup.csv’


2025-03-02 22:26:46 (2.79 MB/s) - ‘taxi_data/taxi_zone_lookup.csv’ saved [12331/12331]



Using the zone lookup data and the Yellow October 2024 data, what is the name of the LEAST frequent pickup location Zone?

* Governor's Island/Ellis Island/Liberty Island
* Arden Heights
* Rikers Island
* Jamaica Bay


In [12]:
zones_df = spark.read \
    .option('header', 'true')\
    .csv('taxi_data/taxi_zone_lookup.csv')

In [13]:
df.createOrReplaceTempView('tripdata')
zones_df.createOrReplaceTempView('zones')

In [14]:
spark.sql("""
    SELECT Zone, count(*) as zone_count
    FROM (SELECT a.PULocationID, b.Zone
            FROM tripdata a
            INNER JOIN zones b ON a.PULocationID = b.LocationID)
    GROUP BY Zone
    ORDER BY zone_count asc""").show()

+--------------------+----------+
|                Zone|zone_count|
+--------------------+----------+
|Governor's Island...|         1|
|       Arden Heights|         2|
|       Rikers Island|         2|
|         Jamaica Bay|         3|
| Green-Wood Cemetery|         3|
|Charleston/Totten...|         4|
|       West Brighton|         4|
|   Rossville/Woodrow|         4|
|Eltingville/Annad...|         4|
|       Port Richmond|         4|
|        Crotona Park|         6|
|         Great Kills|         6|
|Heartland Village...|         7|
|     Mariners Harbor|         7|
|Saint George/New ...|         9|
|             Oakwood|         9|
|New Dorp/Midland ...|        10|
|       Broad Channel|        10|
|         Westerleigh|        12|
|     Pelham Bay Park|        12|
+--------------------+----------+
only showing top 20 rows

