In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/03/04 22:16:25 WARN Utils: Your hostname, C4P2.local resolves to a loopback address: 127.0.0.1; using 192.168.0.115 instead (on interface en0)
24/03/04 22:16:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/04 22:16:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/04 22:16:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
tripdata_file = "fhv_tripdata_2019-10.csv"

In [4]:
import pandas as pd

In [5]:
!head -n 1001 {tripdata_file} > head.csv

In [6]:
df_pandas = pd.read_csv('head.csv')
df_pandas.head()


Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264.0,264.0,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264.0,264.0,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264.0,264.0,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264.0,264.0,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264.0,264.0,,B00014


In [7]:
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [8]:
from pyspark.sql import types
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True),

])

In [9]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv(tripdata_file)

In [10]:
output_path = "data/fhvhv"
df \
    .repartition(6) \
    .write.parquet(output_path, mode="overwrite")

                                                                                

In [11]:
df = spark.read.parquet(f"{output_path}/*")

Question 3:
Count records

How many taxi trips were there on the 15th of October?

In [12]:
import datetime
start_date = datetime.date(year=2019, month=10, day=15)
end_date = datetime.date(year=2019, month=10, day=16)

In [13]:
df.filter((df.pickup_datetime >= start_date) & (df.pickup_datetime <  end_date)).count()

                                                                                

62610

Question 4:

Longest trip for each day

What is the length of the longest trip in the dataset in hours?

In [14]:
from pyspark.sql import functions as F

In [15]:
def timedelta_to_hours(timedelta_obj):
    total_seconds = timedelta_obj.total_seconds()
    hours = total_seconds / 3600
    return hours
calculate_time_diff_in_hours = F.udf(timedelta_to_hours)

In [16]:
df = df.withColumn(
    'trip_length', 
    df.dropOff_datetime  - df.pickup_datetime)

In [17]:
df = df.withColumn(
    'trip_length_hours', 
    calculate_time_diff_in_hours(df.trip_length).cast(types.FloatType())
)

In [18]:
df.sort(df.trip_length_hours.desc()).select("trip_length_hours").head()

                                                                                

Row(trip_length_hours=631152.5)

Question 6:
Least frequent pickup location zone

Load the zone lookup data into a temp view in Spark
Zone Data

Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?

In [19]:
df_zones = spark.read.csv("taxi_zone_lookup.csv", header=True, inferSchema=True)

                                                                                

In [20]:
df_result = df.join(df_zones, df.PULocationID == df_zones.LocationID)

In [21]:
df_result.head()

Row(dispatching_base_num='B02735', pickup_datetime=datetime.datetime(2019, 10, 3, 22, 35, 28), dropOff_datetime=datetime.datetime(2019, 10, 3, 22, 41, 1), PULocationID=264, DOLocationID=259, SR_Flag=None, Affiliated_base_number='B02682', trip_length=datetime.timedelta(seconds=333), trip_length_hours=0.0925000011920929, LocationID=264, Borough='Unknown', Zone='NV', service_zone='N/A')

In [22]:
result_df = df_result.groupBy(df_result['LocationID'], df_result['Zone']).count().orderBy('count', ascending=True)
result_df.head()

                                                                                

Row(LocationID=2, Zone='Jamaica Bay', count=1)