In [1]:
import urllib.request

import pyspark
from pyspark.sql import SparkSession, types

## Question 1: Install PySpark

In [2]:
spark = SparkSession.builder.getOrCreate()

22/03/01 21:44:23 WARN Utils: Your hostname, Alexs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.2.19 instead (on interface en0)
22/03/01 21:44:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/01 21:44:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.version

'3.2.1'

## Question 2: HVFHW February 2021

In [None]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

In [None]:
!wc -l fhvhv_tripdata_2021-02.csv

In [4]:
df = spark.read.option("header", "true").csv('fhvhv_tripdata_2021-02.csv')

In [5]:
df.schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,StringType,true),StructField(DOLocationID,StringType,true),StructField(SR_Flag,StringType,true)))

In [6]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [7]:
df = spark.read.option("header", "true").schema(schema).csv('fhvhv_tripdata_2021-02.csv')

In [8]:
df.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 2, 1, 0, 10, 40), dropoff_datetime=datetime.datetime(2021, 2, 1, 0, 21, 9), PULocationID=35, DOLocationID=39, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 2, 1, 0, 27, 23), dropoff_datetime=datetime.datetime(2021, 2, 1, 0, 44, 1), PULocationID=39, DOLocationID=35, SR_Flag=None),
 Row(hvfhs_license_num='HV0005', dispatching_base_num='B02510', pickup_datetime=datetime.datetime(2021, 2, 1, 0, 28, 38), dropoff_datetime=datetime.datetime(2021, 2, 1, 0, 38, 27), PULocationID=39, DOLocationID=91, SR_Flag=None),
 Row(hvfhs_license_num='HV0005', dispatching_base_num='B02510', pickup_datetime=datetime.datetime(2021, 2, 1, 0, 43, 37), dropoff_datetime=datetime.datetime(2021, 2, 1, 1, 23, 20), PULocationID=91, DOLocationID=228, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02872', pickup_datetime=d

In [None]:
df = df.repartition(24)

In [None]:
df.write.parquet('fhvhv/2021/02/')

In [None]:
!du -sh fhvhv/2021/02/

## Question 3: Count records

In [9]:
df = spark.read.parquet('fhvhv/2021/02/')

In [10]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [11]:
df.registerTempTable('trips_data')



In [12]:
spark.sql("""
SELECT
    COUNT(*)
FROM
    trips_data
WHERE
    CAST(pickup_datetime AS date) = '2021-02-15';
""").show()

[Stage 3:>                                                        (0 + 10) / 12]

+--------+
|count(1)|
+--------+
|  367170|
+--------+



                                                                                

## Question 4: Longest trip for each day

In [13]:
spark.sql("""
SELECT
    pickup_datetime,
    dropoff_datetime,
    dropoff_datetime - pickup_datetime AS travel_time
FROM
    trips_data
ORDER BY
    travel_time DESC
LIMIT 1;
""").show()



+-------------------+-------------------+--------------------+
|    pickup_datetime|   dropoff_datetime|         travel_time|
+-------------------+-------------------+--------------------+
|2021-02-11 13:40:44|2021-02-12 10:39:44|INTERVAL '0 20:59...|
+-------------------+-------------------+--------------------+



                                                                                

## Question 5: Most frequent dispatching_base_num

In [14]:
spark.sql("""
SELECT
    dispatching_base_num,
    COUNT(dispatching_base_num) AS most_frequent
FROM
    trips_data
GROUP BY
    1
ORDER BY
    2 DESC;
""").show()

+--------------------+-------------+
|dispatching_base_num|most_frequent|
+--------------------+-------------+
|              B02510|      3233664|
|              B02764|       965568|
|              B02872|       882689|
|              B02875|       685390|
|              B02765|       559768|
|              B02869|       429720|
|              B02887|       322331|
|              B02871|       312364|
|              B02864|       311603|
|              B02866|       311089|
|              B02878|       305185|
|              B02682|       303255|
|              B02617|       274510|
|              B02883|       251617|
|              B02884|       244963|
|              B02882|       232173|
|              B02876|       215693|
|              B02879|       210137|
|              B02867|       200530|
|              B02877|       198938|
+--------------------+-------------+
only showing top 20 rows



## Question 6: Most common locations pair

In [None]:
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

In [15]:
!wc -l taxi+_zone_lookup.csv

     266 taxi+_zone_lookup.csv


In [16]:
df_zones = spark.read.option("header", "true").csv('taxi+_zone_lookup.csv')

In [17]:
df_zones.schema

StructType(List(StructField(LocationID,StringType,true),StructField(Borough,StringType,true),StructField(Zone,StringType,true),StructField(service_zone,StringType,true)))

In [18]:
df_zones.head()

Row(LocationID='1', Borough='EWR', Zone='Newark Airport', service_zone='EWR')

In [19]:
schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.TimestampType(), True),
])

In [20]:
df_zones = spark.read.option("header", "true").schema(schema).csv('taxi+_zone_lookup.csv')

In [21]:
df_zones.registerTempTable('zones_data')

In [44]:
df_join = df.join(df_zones, df.PULocationID==df_zones.LocationID, how='left')
df_join = df_join.withColumnRenamed('Zone', 'PUZone')
df_join = df_join.drop('LocationID')
df_join.head()

Row(hvfhs_license_num='HV0005', dispatching_base_num='B02510', pickup_datetime=datetime.datetime(2021, 2, 4, 7, 50, 44), dropoff_datetime=datetime.datetime(2021, 2, 4, 8, 19, 59), PULocationID=181, DOLocationID=71, SR_Flag=None, Borough='Brooklyn', PUZone='Park Slope', service_zone=None)

In [45]:
df_join = df_join.join(df_zones, df_join.DOLocationID==df_zones.LocationID, how='left')
df_join = df_join.withColumnRenamed('Zone', 'DOZone')
df_join = df_join.drop('LocationID')
df_join.head()

Row(hvfhs_license_num='HV0005', dispatching_base_num='B02510', pickup_datetime=datetime.datetime(2021, 2, 4, 7, 50, 44), dropoff_datetime=datetime.datetime(2021, 2, 4, 8, 19, 59), PULocationID=181, DOLocationID=71, SR_Flag=None, Borough='Brooklyn', PUZone='Park Slope', service_zone=None, Borough='Brooklyn', DOZone='East Flatbush/Farragut', service_zone=None)

In [34]:
df_join.registerTempTable('fhvhv_zones_data')



In [38]:
spark.sql("""
SELECT
    CONCAT(COALESCE(PUZone, 'Unknown'), '/', COALESCE(DOZone, 'Unknown')) as pickup_dropoff,
    COUNT(1) as frequency
FROM
    fhvhv_zones_data
GROUP BY
    pickup_dropoff
ORDER BY
    frequency DESC
LIMIT 1;
""").show()



+--------------------+---------+
|      pickup_dropoff|frequency|
+--------------------+---------+
|East New York/Eas...|    45041|
+--------------------+---------+



                                                                                

## Bonus: Join type