In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/03/02 16:22:49 WARN Utils: Your hostname, codespaces-2912e2 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/03/02 16:22:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/02 16:22:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/02 16:22:51 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark.version

'3.5.0'

In [4]:
!ls -lh data/raw/fhv/2019/10/fhv_tripdata_2019-10.csv.gz

-rw-rw-rw- 1 vscode vscode 19M Dec  1  2022 data/raw/fhv/2019/10/fhv_tripdata_2019-10.csv.gz


In [5]:
import pandas as pd

In [6]:
df_fhv_pd = pd.read_csv('data/raw/fhv/2019/10/fhv_tripdata_2019-10.csv.gz', nrows = 1000, keep_default_na=False)

In [7]:
df_fhv_pd

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264,264,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264,264,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264,264,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264,264,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264,264,,B00014
...,...,...,...,...,...,...,...
995,B00111,2019-10-01 01:54:57,2019-10-01 03:10:06,264,100,,B00111
996,B00111,2019-10-01 01:07:06,2019-10-01 01:29:54,264,188,,B00111
997,B00112,2019-10-01 01:05:36,2019-10-01 01:36:28,264,228,,B00112
998,B00160,2019-10-01 01:20:00,2019-10-01 01:26:00,264,264,,B00160


In [8]:
spark.createDataFrame(df_fhv_pd).schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [9]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime',types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID',types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [10]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('data/raw/fhv/2019/10/fhv_tripdata_2019-10.csv.gz')

df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [11]:
df.show()

                                                                                

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [12]:
df.repartition(6).write.parquet('data/pq/fhvhv/2019/10/')

                                                                                

In [13]:
df.createOrReplaceTempView('fhv')

In [14]:
from pyspark.sql import functions as F

In [15]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2019-10-15'") \
    .count()

                                                                                

62610

In [16]:
spark.sql("""
Select to_date(pickup_datetime), count(1) from fhv
where to_date(pickup_datetime) = '2019-10-15'
GROUP BY to_date(pickup_datetime)
""").show()

[Stage 7:>                                                          (0 + 1) / 1]

+------------------------+--------+
|to_date(pickup_datetime)|count(1)|
+------------------------+--------+
|              2019-10-15|   62610|
+------------------------+--------+



                                                                                

**Q4**: Longest trip for each day

In [17]:
# What is the length of the longest trip in the dataset in hours?</br>
spark.sql("""
Select *,((unix_timestamp(dropOff_datetime)-unix_timestamp(pickup_datetime))/3600) from fhv 
order by ((unix_timestamp(dropOff_datetime)-unix_timestamp(pickup_datetime))/3600)  desc limit 1

""").show()

[Stage 10:>                                                         (0 + 1) / 1]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------------------------------------------------------------------------------------------------------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|((unix_timestamp(dropOff_datetime, yyyy-MM-dd HH:mm:ss) - unix_timestamp(pickup_datetime, yyyy-MM-dd HH:mm:ss)) / 3600)|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------------------------------------------------------------------------------------------------------------------+
|              B02832|2019-10-11 18:00:00|2091-10-11 18:30:00|         264|         264|   NULL|                B02832|                                                                                                               631152.5|
+--------------------+------------------

                                                                                

In [18]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropOff_datetime',
 'PUlocationID',
 'DOlocationID',
 'SR_Flag',
 'Affiliated_base_number']

**Q6**: Most common locations pair

In [19]:
df_lookup_pd = pd.read_csv('taxi+_zone_lookup.csv', keep_default_na=False)

In [20]:
spark.createDataFrame(df_lookup_pd).schema

StructType([StructField('LocationID', LongType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [21]:
lookup_schema = types.StructType([
types.StructField('LocationID', types.IntegerType(), True), 
types.StructField('Borough', types.StringType(), True), 
types.StructField('Zone', types.StringType(), True), 
types.StructField('service_zone', types.StringType(), True)
])

In [22]:
df_lookup = spark.read \
    .option("header", "true") \
    .schema(lookup_schema) \
    .csv('taxi+_zone_lookup.csv')

In [23]:
df_lookup.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [24]:
df_lookup.createOrReplaceTempView('lookup')

In [25]:
# Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?

spark.sql("""
Select lookup.Zone, count(1) from fhv
left join lookup on fhv.PUlocationID = lookup.LocationID
group by lookup.Zone
order by 2 asc
""").show()

[Stage 13:>                                                         (0 + 1) / 1]

+--------------------+--------+
|                Zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
|        Battery Park|      15|
|Saint Michaels Ce...|      23|
|Breezy Point/Fort...|      25|
|Marine Park/Floyd...|      26|
|        Astoria Park|      29|
|    Inwood Hill Park|      39|
|       Willets Point|      47|
|Forest Park/Highl...|      53|
|  Brooklyn Navy Yard|      57|
|        Crotona Park|      62|
|        Country Club|      77|
|     Freshkills Park|      89|
|       Prospect Park|      98|
|     Columbia Street|     105|
|  South Williamsburg|     110|
+--------------------+--------+
only showing top 20 rows



                                                                                