In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

In [None]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

### Q1

In [3]:
spark.version

'3.4.2'

In [None]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

In [24]:
schema = types.StructType(
	[
		types.StructField('dispatching_base_num', types.StringType(), True), 
		types.StructField('pickup_datetime', types.TimestampType(), True), 
		types.StructField('dropOff_datetime', types.TimestampType(), True), 
		types.StructField('PUlocationID', types.IntegerType(), True), 
		types.StructField('DOlocationID', types.IntegerType(), True), 
		types.StructField('SR_Flag', types.StringType(), True), 
		types.StructField('Affiliated_base_number', types.StringType(), True)
	]
)


In [8]:
df = spark.read \
    .option('header','true') \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv.gz')

In [9]:
df.show()


+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

In [20]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PUlocationID', IntegerType(), True), StructField('DOlocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [24]:
df.repartition(6).write.parquet('data/pq/fhvhv/2019/10/')

                                                                                

### Q2

In [26]:
!ls -lh ./data/pq/fhvhv/2019/10/


total 39M
-rw-r--r-- 1 pedro pedro 6,4M mar  6 17:29 part-00000-1aa62fe7-64b8-49c1-8a1b-38b3982c7ea2-c000.snappy.parquet
-rw-r--r-- 1 pedro pedro 6,4M mar  6 17:29 part-00001-1aa62fe7-64b8-49c1-8a1b-38b3982c7ea2-c000.snappy.parquet
-rw-r--r-- 1 pedro pedro 6,4M mar  6 17:29 part-00002-1aa62fe7-64b8-49c1-8a1b-38b3982c7ea2-c000.snappy.parquet
-rw-r--r-- 1 pedro pedro 6,4M mar  6 17:29 part-00003-1aa62fe7-64b8-49c1-8a1b-38b3982c7ea2-c000.snappy.parquet
-rw-r--r-- 1 pedro pedro 6,4M mar  6 17:29 part-00004-1aa62fe7-64b8-49c1-8a1b-38b3982c7ea2-c000.snappy.parquet
-rw-r--r-- 1 pedro pedro 6,4M mar  6 17:29 part-00005-1aa62fe7-64b8-49c1-8a1b-38b3982c7ea2-c000.snappy.parquet
-rw-r--r-- 1 pedro pedro    0 mar  6 17:29 _SUCCESS


### Q3

In [10]:
# df.registerTempTable('tmp_table')
df.createOrReplaceTempView('tmp_table')

In [35]:
spark.sql('''
    SELECT count(1) FROM tmp_table
    WHERE to_date(pickup_datetime) = "2019-10-15" 
''').show()

[Stage 18:>                                                         (0 + 1) / 1]

+--------+
|count(1)|
+--------+
|   62610|
+--------+



                                                                                

### Q4

In [59]:
spark.sql('''
    SELECT
    TO_DATE(pickup_datetime) AS pickup_date
    ,DATEDIFF(hour, pickup_datetime, dropOff_datetime) AS duration
    FROM
        tmp_table
    ORDER BY
        duration desc
    LIMIT 1
''').show()

[Stage 46:>                                                         (0 + 1) / 1]

+-----------+--------+
|pickup_date|duration|
+-----------+--------+
| 2019-10-11|  631152|
+-----------+--------+



                                                                                

### Q6

In [None]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

In [25]:
df_zones = spark.read \
    .option('header', 'true') \
    .schema(zones_schema) \
    .csv('taxi_zone_lookup.csv')

In [21]:
df_zones.schema

StructType([StructField('LocationID', StringType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [23]:
zones_schema = types.StructType(
    [
        types.StructField('LocationID', types.IntegerType(), True), 
        types.StructField('Borough', types.StringType(), True), 
        types.StructField('Zone', types.StringType(), True), 
        types.StructField('service_zone', types.StringType(), True)
    ]
)


In [27]:
df_zones.createOrReplaceTempView('tmp_zones')

In [20]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [32]:
spark.sql('''
    SELECT 
    tmp_table.PUlocationID
    ,tmp_zones.Borough
    ,tmp_zones.Zone
    ,COUNT(1) rides
    FROM 
        tmp_table
    LEFT JOIN
        tmp_zones
    ON
        tmp_table.PUlocationID = tmp_zones.LocationID
    GROUP BY
        1, 2, 3
    ORDER BY 4 ASC
''').show()

[Stage 22:>                                                         (0 + 1) / 1]

+------------+-------------+--------------------+-----+
|PUlocationID|      Borough|                Zone|rides|
+------------+-------------+--------------------+-----+
|           2|       Queens|         Jamaica Bay|    1|
|         105|    Manhattan|Governor's Island...|    2|
|         111|     Brooklyn| Green-Wood Cemetery|    5|
|          30|       Queens|       Broad Channel|    8|
|         120|    Manhattan|     Highbridge Park|   14|
|          12|    Manhattan|        Battery Park|   15|
|         207|       Queens|Saint Michaels Ce...|   23|
|          27|       Queens|Breezy Point/Fort...|   25|
|         154|     Brooklyn|Marine Park/Floyd...|   26|
|           8|       Queens|        Astoria Park|   29|
|         128|    Manhattan|    Inwood Hill Park|   39|
|         253|       Queens|       Willets Point|   47|
|          96|       Queens|Forest Park/Highl...|   53|
|          34|     Brooklyn|  Brooklyn Navy Yard|   57|
|          59|        Bronx|        Crotona Park

                                                                                