<a href="https://colab.research.google.com/github/rmengato/DE_Zoomcamp_files_w5/blob/main/pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [35]:
import pyspark
from pyspark.sql import functions as F

In [36]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [37]:
spark.version

'3.5.5'

In [38]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-06 21:44:58--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 13.226.36.218, 13.226.36.130, 13.226.36.73, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.226.36.218|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet.1’


2025-03-06 21:44:59 (62.7 MB/s) - ‘yellow_tripdata_2024-10.parquet.1’ saved [64346071/64346071]



In [52]:
df = spark.read.parquet("/content/yellow_tripdata_2024-10.parquet")

In [53]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [54]:
#verify data types
df.schema

StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampNTZType(), True), StructField('tpep_dropoff_datetime', TimestampNTZType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])

In [55]:
#question about partition and file sizes
df = df.repartition(4)

In [56]:
#df.write.parquet("spark_oct_24_taxi_data", mode = "overwrite")

In [45]:
#!zip -r spark_oct_24_taxi_data.zip spark_oct_24_taxi_data/

In [46]:
"""
from google.colab import files
files.download("spark_oct_24_taxi_data.zip")
"""

'\nfrom google.colab import files\nfiles.download("spark_oct_24_taxi_data.zip")\n'

In [62]:
#how many trips started on oct 15?

df = df.withColumn("pickup_date", F.to_date(df.tpep_pickup_datetime))\
  .withColumn("dropoff_date", F.to_date(df.tpep_dropoff_datetime))


In [63]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_date|dropoff_date|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+------------+
|       2| 2024-10-04 23:19:27|  2024-10-04 23:43:28|              1|         6.07|         1|                 N

In [65]:
df.filter(df.pickup_date == "2024-10-15").count()

128893

In [75]:
#longest trip in the dataset in hours?

df = df.withColumn("trip_duration", df.tpep_dropoff_datetime - df.tpep_pickup_datetime)

df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_date|dropoff_date|       trip_duration|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+------------+--------------------+
|       2| 2024-10-04 23:19:27|  2024-10-04 23:43

In [77]:
df = df.withColumn(
    "duration_hours",
    F.expr("extract(DAY from trip_duration) * 24 + extract(HOUR from trip_duration) + extract(MINUTE from trip_duration) / 60")
)

In [78]:
df.select(F.max("duration_hours")).show()

+-------------------+
|max(duration_hours)|
+-------------------+
| 162.61666666666667|
+-------------------+



In [79]:
#Load zone and find least frequent pickup location

!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-06 22:09:43--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 13.226.36.218, 13.226.36.130, 13.226.36.73, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.226.36.218|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-03-06 22:09:43 (6.99 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [82]:
df_zones = spark.read.option("header",True)\
  .option("inferSchema",True) \
  .csv("/content/taxi_zone_lookup.csv")

In [83]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [85]:
df = df.join(df_zones,df.PULocationID ==  df_zones.LocationID,"leftouter")

In [90]:
df.groupBy("Zone").count().sort("count", ascending = True).show()

+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|Governor's Island...|    1|
|       Rikers Island|    2|
|       Arden Heights|    2|
| Green-Wood Cemetery|    3|
|         Jamaica Bay|    3|
|Charleston/Totten...|    4|
|   Rossville/Woodrow|    4|
|       West Brighton|    4|
|       Port Richmond|    4|
|Eltingville/Annad...|    4|
|         Great Kills|    6|
|        Crotona Park|    6|
|Heartland Village...|    7|
|     Mariners Harbor|    7|
|Saint George/New ...|    9|
|             Oakwood|    9|
|       Broad Channel|   10|
|New Dorp/Midland ...|   10|
|         Westerleigh|   12|
|     Pelham Bay Park|   12|
+--------------------+-----+
only showing top 20 rows

