In [1]:
from pathlib import Path

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

**Question 1. Execute spark.version - what's the output?**

Answer:

To do that, first we need to create `SparkSession` instance - we're going to run Spark locally with 3 worker threads.

In [2]:
conf = SparkConf()
conf.setAppName("DEZoomCamp Week 5")
conf.setMaster("local[3]")

spark = SparkSession.builder.config(conf=conf).getOrCreate()

23/03/06 20:49:18 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.1.108 instead (on interface enp0s31f6)
23/03/06 20:49:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/03/06 20:49:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Now, Spark version is accessible through `spark.version` attribute.

In [3]:
print(spark.version)

3.3.1


**Question 2. Read the High Volume FHV trips for June 2021, then repartition it to 12 partitions and save it in the `parquet` file format. What is the average size of the `parquet` files that were created (in MB)?**

Answer:

In [4]:
csv_path = Path("/home/michal/Projects/de_zoomcamp_23/data/raw/fhv_tripdata/fhvhv_tripdata_2021-06.csv.gz")
trips_schema = T.StructType(
    [
        T.StructField("dispatching_base_num", T.StringType(), True),
        T.StructField("pickup_datetime", T.TimestampType(), True),
        T.StructField("dropoff_datetime", T.TimestampType(), True),
        T.StructField("PULocationID", T.IntegerType(), True),
        T.StructField("DOLocationID", T.IntegerType(), True),
        T.StructField("SR_Flag", T.StringType(), True),
        T.StructField("Affiliated_base_number", T.StringType(), True)
    ]
)

trips = spark.read.csv(csv_path.as_posix(), header=True, schema=trips_schema)

In [5]:
parquet_path = Path("/home/michal/Projects/de_zoomcamp_23/data/transformed/fhv_tripdata/hvfhv_2021_06.parquet")
trips = trips.repartition(12)
trips.write.parquet(parquet_path.as_posix(), mode="overwrite")

                                                                                

In [6]:
!du -sh /home/michal/Projects/de_zoomcamp_23/data/transformed/fhv_tripdata/hvfhv_2021_06.parquet/*

24M	/home/michal/Projects/de_zoomcamp_23/data/transformed/fhv_tripdata/hvfhv_2021_06.parquet/part-00000-e72dbede-79dc-4560-b74e-d96b16484e46-c000.snappy.parquet
24M	/home/michal/Projects/de_zoomcamp_23/data/transformed/fhv_tripdata/hvfhv_2021_06.parquet/part-00001-e72dbede-79dc-4560-b74e-d96b16484e46-c000.snappy.parquet
24M	/home/michal/Projects/de_zoomcamp_23/data/transformed/fhv_tripdata/hvfhv_2021_06.parquet/part-00002-e72dbede-79dc-4560-b74e-d96b16484e46-c000.snappy.parquet
24M	/home/michal/Projects/de_zoomcamp_23/data/transformed/fhv_tripdata/hvfhv_2021_06.parquet/part-00003-e72dbede-79dc-4560-b74e-d96b16484e46-c000.snappy.parquet
24M	/home/michal/Projects/de_zoomcamp_23/data/transformed/fhv_tripdata/hvfhv_2021_06.parquet/part-00004-e72dbede-79dc-4560-b74e-d96b16484e46-c000.snappy.parquet
24M	/home/michal/Projects/de_zoomcamp_23/data/transformed/fhv_tripdata/hvfhv_2021_06.parquet/part-00005-e72dbede-79dc-4560-b74e-d96b16484e46-c000.snappy.parquet
24M	/home/michal/Projects/de_zoomc

We can see that the size of each partition is around 24MB.

**Question 3. How many taxi trips were there on June 15? Consider only trips that started that day.**

Answer:

In [7]:
trips = spark.read.parquet(parquet_path.as_posix())

trips_june_15 = trips.filter((F.col("pickup_datetime") >= "2021-06-15") & (F.col("pickup_datetime") < "2021-06-16"))
trips_june_15.count()

                                                                                

452470

**Question 4. How long was the longest trip in hours?**

Answer:

In [8]:
trips.select("pickup_datetime", "dropoff_datetime") \
    .withColumn("trip_duration_in_seconds", F.col("dropoff_datetime").cast("long") - F.col("pickup_datetime").cast("long")) \
    .withColumn("trip_duration_in_hours", F.col("trip_duration_in_seconds") / 3600) \
    .orderBy("trip_duration_in_seconds", ascending=False) \
    .head(1)

                                                                                

[Row(pickup_datetime=datetime.datetime(2021, 6, 25, 13, 55, 41), dropoff_datetime=datetime.datetime(2021, 6, 28, 8, 48, 25), trip_duration_in_seconds=240764, trip_duration_in_hours=66.8788888888889)]

**Question 5. Which port does the Spark's Web UI run on?**

Answer:

By default, every SparkContext launches a Web UI on port 4040.

**Question 6. Load the zone lookup data into a temporary view in Spark. Using this data, what is the name of the most frequent pickup location zone?**

Answer:

In [9]:
zones_path = Path("/home/michal/Projects/de_zoomcamp_23/data/raw/taxi_zone_lookup.csv")
zones = spark.read.csv(zones_path.as_posix(), header=True)

trips.createOrReplaceTempView("Trips")
zones.createOrReplaceTempView("Zones")

In [10]:
spark.sql("""
SELECT     z.Zone
           , COUNT(*) AS n_trips
FROM       Zones z
LEFT JOIN  Trips t
ON         z.LocationID = t.PULocationID
GROUP BY   z.LocationID
           , z.Zone
ORDER BY   n_trips DESC;           
""").show()

23/03/06 20:50:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/03/06 20:50:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/03/06 20:50:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/03/06 20:50:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/03/06 20:50:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/03/06 20:50:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

+--------------------+-------+
|                Zone|n_trips|
+--------------------+-------+
| Crown Heights North| 231279|
|        East Village| 221244|
|         JFK Airport| 188867|
|      Bushwick South| 187929|
|       East New York| 186780|
|TriBeCa/Civic Center| 164344|
|   LaGuardia Airport| 161596|
|            Union Sq| 158937|
|        West Village| 154698|
|             Astoria| 152493|
|     Lower East Side| 151020|
|        East Chelsea| 147673|
|Central Harlem North| 146402|
|Williamsburg (Nor...| 143683|
|          Park Slope| 143594|
|  Stuyvesant Heights| 141427|
|        Clinton East| 139611|
|West Chelsea/Huds...| 139431|
|             Bedford| 138428|
|         Murray Hill| 137879|
+--------------------+-------+
only showing top 20 rows



                                                                                

In [11]:
spark.stop()