In [28]:
# !pip install findspark

You should consider upgrading via the '/Users/muh_ilhamfajar/workspace/de-zoomcamp/week_5_batch_processing/venv/bin/python3 -m pip install --upgrade pip' command.[0m


In [29]:
import findspark
findspark.init()

In [73]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import col, to_date

In [31]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [119]:
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-03-02 17:02:44--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240302%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240302T100245Z&X-Amz-Expires=300&X-Amz-Signature=915aa4f44e836cd2caf4a9f1ae006888536dae95396b8933416da85eaaa64789&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-02 17:02:45--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62

In [33]:
# !wc -l fhv_tripdata_2019-10.csv.gz

In [34]:
schema = StructType([
    StructField('dispatching_base_num', StringType(), True), 
    StructField('pickup_datetime', TimestampType(), True), 
    StructField('dropOff_datetime', TimestampType(), True), 
    StructField('PUlocationID', IntegerType(), True), 
    StructField('DOlocationID', IntegerType(), True), 
    StructField('SR_Flag', StringType(), True), 
    StructField('Affiliated_base_number', StringType(), True)
])

In [38]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv.gz')

[Row(dispatching_base_num='B00009', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 23), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 35), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00009'),
 Row(dispatching_base_num='B00013', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 11, 29), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 13, 22), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00013'),
 Row(dispatching_base_num='B00014', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 11, 43), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 37, 20), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 56, 29), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 57, 47), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=

In [40]:
df = df.repartition(6)

In [42]:
df.write.parquet('fhv_trip/2019/10/')

                                                                                

In [107]:
df = spark.read \
    .parquet('fhv_trip/2019/10/')

In [108]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [109]:
# To filter on 2019-10-15
df = df.withColumn("pickup_date",to_date("pickup_datetime"))
filterred_trip = df.filter(col("pickup_date") == "2019-10-15")
filterred_trip.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|pickup_date|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+
|              B03162|2019-10-15 09:34:17|2019-10-15 10:16:32|          69|          32|   NULL|                B03162| 2019-10-15|
|              B02182|2019-10-15 11:45:48|2019-10-15 12:33:24|         264|         264|   NULL|                B02835| 2019-10-15|
|              B00256|2019-10-15 09:41:20|2019-10-15 10:18:31|         264|         264|   NULL|                B00256| 2019-10-15|
|              B00706|2019-10-15 17:30:17|2019-10-15 17:53:42|         245|         221|   NULL|                B00706| 2019-10-15|
|              B00457|2019-10-15 05:09:51|2019-10-15 05:18:50|         264| 

In [118]:
# Find the longest trip
df = df.withColumn("duration_seconds", (col("dropOff_datetime").cast("long") - col("pickup_datetime").cast("long")))
longest_trip = df.orderBy(col("duration_seconds").desc()).first()

duration_seconds = longest_trip["duration_seconds"]
duration_hours = duration_seconds / 3600

631152.5


In [127]:
# Least frequent pickup location zone
zone_df = spark.read.csv('taxi_zone_lookup.csv', header=True, inferSchema=True)

pickup_counts_df = df.groupBy("PUlocationID").count()

result_df = pickup_counts_df.join(zone_df, pickup_counts_df["PUlocationID"] == zone_df["LocationID"])

least_frequent_zone = result_df.orderBy(col("count")).first()

# Display the result
print("Least frequent pickup location zone:")
least_frequent_zone

Least frequent pickup location zone:


Row(PUlocationID=2, count=1, LocationID=2, Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone')