In [15]:
import pandas as pd
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
#start a Spark Session

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

22/02/28 20:37:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Question 1
#### Execute spark.version

In [4]:
spark.version

'3.0.3'

### Question 2. HVFHW February 2021

In [5]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-02-28 20:39:15--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.235.161
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.235.161|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv’


2022-02-28 20:39:36 (33.4 MB/s) - ‘fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]



In [9]:
#get the dataframe schema using pandas 

pandas_df = pd.read_csv('fhvhv_tripdata_2021-02.csv')

pandas_df.head()

Unnamed: 0,hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag
0,HV0003,B02764,2021-02-01 00:10:40,2021-02-01 00:21:09,35,39,
1,HV0003,B02764,2021-02-01 00:27:23,2021-02-01 00:44:01,39,35,
2,HV0005,B02510,2021-02-01 00:28:38,2021-02-01 00:38:27,39,91,
3,HV0005,B02510,2021-02-01 00:43:37,2021-02-01 01:23:20,91,228,
4,HV0003,B02872,2021-02-01 00:08:42,2021-02-01 00:17:57,126,250,


In [13]:
pandas_df.dtypes

hvfhs_license_num        object
dispatching_base_num     object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
SR_Flag                 float64
dtype: object

In [50]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [51]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [52]:
df.schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,TimestampType,true),StructField(dropoff_datetime,TimestampType,true),StructField(PULocationID,IntegerType,true),StructField(DOLocationID,IntegerType,true),StructField(SR_Flag,StringType,true)))

In [22]:
df = df.repartition(24)

In [23]:
df.write.parquet('fhvhv/2021/02/')

                                                                                

### Question 3: Count records

#### How many taxi trips were there on February 15?

In [53]:
## register spark df as Temp table
df.registerTempTable('fhvhomework_data')

In [31]:
spark.sql("""
select count(*)

from fhvhomework_data

where DATE(pickup_datetime) = '2021-02-15'""").show()



+--------+
|count(1)|
+--------+
|  367170|
+--------+



                                                                                

### Question 4. Longest trip for each day
#### Now calculate the duration for each trip. Trip starting on which day was the longest?

In [32]:
#first calculate the trip duration for each trip... pickup_datetime - droppoffdatetime

from pyspark.sql import functions as F

In [42]:
#df = df.withColumn("trip_duration", (df.dropoff_datetime - df.pickup_datetime))

## register spark df as Temp table
df.registerTempTable('fhvhomework_data')

In [55]:
spark.sql("""
select pickup_datetime,dropoff_datetime,
(unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)) as trip_duration

from fhvhomework_data

order by trip_duration

DESC

limit 10""").show()



+-------------------+-------------------+-------------+
|    pickup_datetime|   dropoff_datetime|trip_duration|
+-------------------+-------------------+-------------+
|2021-02-11 13:40:44|2021-02-12 10:39:44|        75540|
|2021-02-17 15:54:53|2021-02-18 07:48:34|        57221|
|2021-02-20 12:08:15|2021-02-21 00:22:14|        44039|
|2021-02-03 20:24:25|2021-02-04 07:41:58|        40653|
|2021-02-19 23:17:44|2021-02-20 09:44:01|        37577|
|2021-02-25 17:13:35|2021-02-26 02:57:05|        35010|
|2021-02-20 01:36:13|2021-02-20 11:16:19|        34806|
|2021-02-18 15:24:19|2021-02-19 01:01:11|        34612|
|2021-02-18 01:31:20|2021-02-18 11:07:15|        34555|
|2021-02-10 20:51:39|2021-02-11 06:21:08|        34169|
+-------------------+-------------------+-------------+



                                                                                

### Question 5. Most frequent dispatching_base_num
#### Now find the most frequently occurring dispatching_base_num in this dataset?

In [58]:
spark.sql("""
select dispatching_base_num,
count(1)

from fhvhomework_data

group by 1
order by 2
DESC""").show()



+--------------------+--------+
|dispatching_base_num|count(1)|
+--------------------+--------+
|              B02510| 3233664|
|              B02764|  965568|
|              B02872|  882689|
|              B02875|  685390|
|              B02765|  559768|
|              B02869|  429720|
|              B02887|  322331|
|              B02871|  312364|
|              B02864|  311603|
|              B02866|  311089|
|              B02878|  305185|
|              B02682|  303255|
|              B02617|  274510|
|              B02883|  251617|
|              B02884|  244963|
|              B02882|  232173|
|              B02876|  215693|
|              B02879|  210137|
|              B02867|  200530|
|              B02877|  198938|
+--------------------+--------+
only showing top 20 rows



                                                                                

### Question 6. Most common locations pair
#### Find the most common pickup-dropoff pair.

In [59]:
#We need to join on the zones table

!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2022-02-28 22:31:09--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.67.134
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.67.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘taxi+_zone_lookup.csv’


2022-02-28 22:31:09 (142 MB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [67]:
zones_pd = pd.read_csv('taxi+_zone_lookup.csv')

zones_pd.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [68]:
zones_pd.tail()

Unnamed: 0,LocationID,Borough,Zone,service_zone
260,261,Manhattan,World Trade Center,Yellow Zone
261,262,Manhattan,Yorkville East,Yellow Zone
262,263,Manhattan,Yorkville West,Yellow Zone
263,264,Unknown,NV,
264,265,Unknown,,


In [61]:
zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

In [64]:
## register spark df as Temp table
zones.registerTempTable('zones')

In [79]:
spark.sql("""
select PULocationID,DOLocationID,
count(*)

from fhvhomework_data
left join zones on fhvhomework_data.PULocationID == zones.LocationID

group by DOLocationID,PULocationID
order by 3
DESC""").show()



+------------+------------+--------+
|PULocationID|DOLocationID|count(1)|
+------------+------------+--------+
|          76|          76|   45041|
|          26|          26|   37329|
|          39|          39|   28026|
|          61|          61|   25976|
|          14|          14|   17934|
|           7|           7|   14688|
|         129|         129|   14688|
|          42|          42|   14481|
|          37|          37|   14424|
|          89|          89|   13976|
|         216|         216|   13716|
|          35|          35|   12829|
|         132|         265|   12542|
|         188|          61|   11814|
|          95|          95|   11548|
|          36|          37|   11491|
|          37|          36|   11487|
|          61|         188|   11462|
|          61|         225|   11342|
|         188|         188|   11308|
+------------+------------+--------+
only showing top 20 rows



                                                                                

In [84]:
zones.select('Zone').filter(zones.LocationID == '76').show()

+-------------+
|         Zone|
+-------------+
|East New York|
+-------------+

