In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

df.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/02/24 15:45:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[Stage 0:>                                                          (0 + 1) / 1]                                                                                

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

In [6]:
from pyspark.sql import types
from pyspark.sql.functions import col, to_date


In [8]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True),
])

In [13]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [14]:
df = df.repartition(6)

In [15]:
df.write.parquet('fhv/2019/10/')

                                                                                

In [17]:
# Assuming 'df' is your DataFrame
filtered_df = df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
                .filter(to_date(df.pickup_datetime) == '2019-10-15')

# Count the number of rows
row_count = filtered_df.count()
print("Number of rows in the 2019-10-15 filtered DataFrame:", row_count)



Number of rows in the filtered DataFrame: 62610


                                                                                

In [19]:
df.count()

                                                                                

1897493

In [21]:
df = df.withColumn("trip_duration_hrs", (col("dropOff_datetime").cast("long") - col("pickup_datetime").cast("long"))/3600)

# Find the longest trip
longest_trip = df.orderBy(col("trip_duration_hrs").desc()).first()

# Print the longest trip
print("Longest Trip:")
print(longest_trip)

[Stage 26:>                                                         (0 + 4) / 6]

Longest Trip:
Row(dispatching_base_num='B02832', pickup_datetime=datetime.datetime(2019, 10, 11, 18, 0), dropOff_datetime=datetime.datetime(2091, 10, 11, 18, 30), PULocationID=264, DOLocationID=264, SR_Flag=None, Affiliated_base_number='B02832', trip_duration=2272149000, trip_duration_hrs=631152.5)




In [25]:
# Download the file from the internet link
import requests

url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv"
local_file_path = "taxi_zone_lookup.csv"  # Local file path where the downloaded file will be saved

response = requests.get(url)
with open(local_file_path, "wb") as f:
    f.write(response.content)
    
zone_df = spark.read.option("header", "true").csv(local_file_path)


+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [27]:

# Join the FHV data with the zone lookup data on the corresponding zone IDs
joined_df = df.join(zone_df, df.PULocationID == zone_df.LocationID)

# Calculate the count of pickups for each zone
pickup_counts = joined_df.groupBy("Zone").count()

# Find the zone with the least frequent pickups
least_frequent_zone = pickup_counts.orderBy("count").first()["Zone"]

# Print the name of the least frequent pickup location zone
print("Name of the LEAST frequent pickup location Zone:", least_frequent_zone)




Name of the LEAST frequent pickup location Zone: Jamaica Bay


                                                                                