In [2]:
from pyspark.sql import SparkSession

In [40]:
from pyspark.sql import functions as F

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
spark

In [14]:
fhvhv = 'gs://dtc_data_lake_de-zoomcamp-376202/fhvhv_tripdata_2021-06.csv.gz'

In [52]:
fhvhv_df = spark.read.csv(fhvhv,inferSchema=True, header=True)

                                                                                

In [24]:
output_path = 'gs://dtc_data_lake_de-zoomcamp-376202/fhv_pq/'

In [27]:
fhvhv_df \
        .repartition(12) \
        .write.parquet(output_path)

                                                                                

In [53]:
fhvhv_df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

In [54]:
fhvhv_df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [20]:
lookup = 'gs://dtc_data_lake_de-zoomcamp-376202/taxi+_zone_lookup.csv'

In [21]:
lookup_df = spark.read.csv(lookup,inferSchema=True, header=True)

In [22]:
lookup_df.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [64]:
lookup_df.show(10)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 10 rows



In [55]:
fhvhv_df.createOrReplaceTempView('fhvhv_data')

In [56]:
spark.sql('''
select
      date_trunc('day',pickup_datetime) as date
      ,count(*)
from fhvhv_data
where date_trunc('day',pickup_datetime) = '2021-06-15'
group by 1
''').show()


[Stage 37:>                                                         (0 + 1) / 1]

+-------------------+--------+
|               date|count(1)|
+-------------------+--------+
|2021-06-15 00:00:00|  452470|
+-------------------+--------+



                                                                                

In [57]:
fhvhv_df = fhvhv_df \
                    .withColumn(
                                'diff_in_',(F.unix_timestamp('dropoff_datetime') - F.unix_timestamp('pickup_datetime')) / 3600 \
                    )

In [58]:
fhvhv_df.head()

Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 2, 41), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 7, 46), PULocationID=174, DOLocationID=18, SR_Flag='N', Affiliated_base_number='B02764', diff_in_=0.08472222222222223)

In [59]:
fhvhv_df.createOrReplaceTempView('fhvhv_data')

In [60]:
spark.sql('''
select
      *
from fhvhv_data
limit 10
''').show()


+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|           diff_in_|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|0.08472222222222223|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|0.08277777777777778|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|0.25277777777777777|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|0.12694444444444444|
|              B0251

In [62]:
spark.sql('''
select
      max(diff_in_) as max_hour
from fhvhv_data
''').show()


[Stage 44:>                                                         (0 + 1) / 1]

+----------------+
|        max_hour|
+----------------+
|66.8788888888889|
+----------------+



                                                                                

In [66]:
fhvhv_df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)
 |-- diff_in_: double (nullable = true)



In [69]:
lookup_df.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [71]:
final_df = fhvhv_df.join(lookup_df, fhvhv_df.PULocationID == lookup_df.LocationID)

In [72]:
final_df.show(10)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+----------+---------+-------------------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|           diff_in_|LocationID|  Borough|               Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+----------+---------+-------------------+------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|0.08472222222222223|       174|    Bronx|            Norwood|   Boro Zone|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|0.08277777777777778|        32|    Bronx|          Bronxdale|   Boro Zone|
|              B02764|2021-06-

In [73]:
final_df.createOrReplaceTempView('final_data')

In [75]:
spark.sql('''
select
      zone
      ,count(*) as total_pickups
from final_data
group by 1
order by count(*) desc
''').show()

[Stage 52:>                                                         (0 + 1) / 1]

+--------------------+-------------+
|                zone|total_pickups|
+--------------------+-------------+
| Crown Heights North|       231279|
|        East Village|       221244|
|         JFK Airport|       188867|
|      Bushwick South|       187929|
|       East New York|       186780|
|TriBeCa/Civic Center|       164344|
|   LaGuardia Airport|       161596|
|            Union Sq|       158937|
|        West Village|       154698|
|             Astoria|       152493|
|     Lower East Side|       151020|
|        East Chelsea|       147673|
|Central Harlem North|       146402|
|Williamsburg (Nor...|       143683|
|          Park Slope|       143594|
|  Stuyvesant Heights|       141427|
|        Clinton East|       139611|
|West Chelsea/Huds...|       139431|
|             Bedford|       138428|
|         Murray Hill|       137879|
+--------------------+-------------+
only showing top 20 rows



                                                                                