### Importing Modules

In [1]:
import pyspark
from pyspark.sql import SparkSession
import os
import pandas as pd

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/06 16:50:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Downloading FHVHV Data

In [3]:
! wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

--2023-03-06 10:17:10--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
Resolving github.com (github.com)... 140.82.112.3
Connecting to github.com (github.com)|140.82.112.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e-a6da-4923-ad6f-35ff02446a51?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230306%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230306T101710Z&X-Amz-Expires=300&X-Amz-Signature=3beec242d578169485141e0f4fb33d730d5e73ac163b47e4931eebd8203dad76&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-06.csv.gz&response-content-type=application%2Foctet-stream [following]
--2023-03-06 10:17:10--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e

### Downloading the Zone Lookup Data

In [9]:
! wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2023-03-06 10:23:35--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.114.3
Connecting to github.com (github.com)|140.82.114.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230306%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230306T102335Z&X-Amz-Expires=300&X-Amz-Signature=2a17b2c5a01454ff17e970ddd630f29e68defc8909205ac2c6026443d111f7aa&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2023-03-06 10:23:35--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

### Reading the FHVHV Dataset

In [3]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-06.csv.gz')

df.show()



+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

In [4]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [6]:
from pyspark.sql import types


### Creating a Schema for the Data

In [7]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

### Reading the Csv file with the Schema

In [8]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv.gz')

df.show()



+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

In [9]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number']

### Spiltting the Data into Parts & Saving it as Parquet Files

In [9]:
df = df.repartition(12)

In [11]:
df.write.parquet('homework/2021/06/', mode='overwrite')

                                                                                

### Reading in the New Parquet File with the Schema

In [10]:
df3 = spark.read.parquet('homework/2021/06/')

In [11]:
df3.count()

14961892

In [12]:
df3.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [13]:
from pyspark.sql.functions import to_date, unix_timestamp, from_unixtime
from pyspark.sql.types import DateType

In [14]:
df4 = df3.withColumn('pickup_date', to_date(df3['pickup_datetime']).cast(DateType()))
df4 = df4.withColumn("pickup_timestamp", unix_timestamp(df3["pickup_datetime"]))
df4 = df4.withColumn("dropoff_timestamp", unix_timestamp(df3["dropoff_datetime"]))
df4 = df4.withColumn("duration_seconds", df4["dropoff_timestamp"] - df4["pickup_timestamp"])
df4 = df4.withColumn("trip_duration", from_unixtime(df4["duration_seconds"], format="HH:mm:ss"))
df4.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+----------------+-----------------+----------------+-------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|pickup_date|pickup_timestamp|dropoff_timestamp|duration_seconds|trip_duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+----------------+-----------------+----------------+-------------+
|              B02889|2021-06-04 20:51:44|2021-06-04 21:10:12|         239|         158|      N|                B02889| 2021-06-04|      1622839904|       1622841012|            1108|     00:18:28|
|              B02800|2021-06-04 15:50:15|2021-06-04 16:19:29|          75|         116|      N|                  null| 2021-06-04|      1622821815|       1622823569|            1754|     00:29:14|
|         

In [15]:
df4.select('pickup_datetime','pickup_timestamp', 'dropoff_datetime','dropoff_timestamp', 'trip_duration', 'duration_seconds').show()

+-------------------+----------------+-------------------+-----------------+-------------+----------------+
|    pickup_datetime|pickup_timestamp|   dropoff_datetime|dropoff_timestamp|trip_duration|duration_seconds|
+-------------------+----------------+-------------------+-----------------+-------------+----------------+
|2021-06-04 20:51:44|      1622839904|2021-06-04 21:10:12|       1622841012|     00:18:28|            1108|
|2021-06-04 15:50:15|      1622821815|2021-06-04 16:19:29|       1622823569|     00:29:14|            1754|
|2021-06-02 21:03:38|      1622667818|2021-06-02 21:10:12|       1622668212|     00:06:34|             394|
|2021-06-02 12:51:57|      1622638317|2021-06-02 13:05:09|       1622639109|     00:13:12|             792|
|2021-06-21 09:51:45|      1624269105|2021-06-21 10:09:17|       1624270157|     00:17:32|            1052|
|2021-06-02 13:27:03|      1622640423|2021-06-02 13:38:20|       1622641100|     00:11:17|             677|
|2021-06-10 14:48:23|      1

### How many taxi trips were there on June 15?

In [16]:
df4.select('pickup_date').filter(df4.pickup_date == '2021-06-15').count()

                                                                                

452470

### Longest trip for each day

In [17]:
df4.registerTempTable('trips_data')



In [18]:
spark.sql("""
SELECT duration_seconds, trip_duration, pickup_datetime, dropoff_datetime
FROM trips_data
ORDER BY duration_seconds desc
""").show()






+----------------+-------------+-------------------+-------------------+
|duration_seconds|trip_duration|    pickup_datetime|   dropoff_datetime|
+----------------+-------------+-------------------+-------------------+
|          240764|     18:52:44|2021-06-25 13:55:41|2021-06-28 08:48:25|
|           91979|     01:32:59|2021-06-22 12:09:45|2021-06-23 13:42:44|
|           71931|     19:58:51|2021-06-27 10:32:29|2021-06-28 06:31:20|
|           65510|     18:11:50|2021-06-26 22:37:11|2021-06-27 16:49:01|
|           59281|     16:28:01|2021-06-23 20:40:43|2021-06-24 13:08:44|
|           51368|     14:16:08|2021-06-23 22:03:31|2021-06-24 12:19:39|
|           50075|     13:54:35|2021-06-24 23:11:00|2021-06-25 13:05:35|
|           42012|     11:40:12|2021-06-04 20:56:02|2021-06-05 08:36:14|
|           40917|     11:21:57|2021-06-27 07:45:19|2021-06-27 19:07:16|
|           39544|     10:59:04|2021-06-20 17:05:12|2021-06-21 04:04:16|
|           36963|     10:16:03|2021-06-01 12:25:29

                                                                                

### Most frequent pickup location zone

In [23]:
zone_df = spark.read.option("header", "True").csv('taxi_zone_lookup.csv')

In [27]:
schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

In [28]:
zone_df = spark.read. \
            option("header", "True"). \
            schema(schema). \
            csv('taxi_zone_lookup.csv')

In [29]:
zone_df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [30]:
zone_df.registerTempTable('zone_data')

In [40]:
spark.sql("""
SELECT zone.Zone as Zone, count(1) AS Freq
FROM trips_data join zone_data as zone
ON trips_data.PULocationID == zone.LocationID
GROUP BY Zone
ORDER BY Freq Desc
""").show()






+--------------------+------+
|                Zone|  Freq|
+--------------------+------+
| Crown Heights North|231279|
|        East Village|221244|
|         JFK Airport|188867|
|      Bushwick South|187929|
|       East New York|186780|
|TriBeCa/Civic Center|164344|
|   LaGuardia Airport|161596|
|            Union Sq|158937|
|        West Village|154698|
|             Astoria|152493|
|     Lower East Side|151020|
|        East Chelsea|147673|
|Central Harlem North|146402|
|Williamsburg (Nor...|143683|
|          Park Slope|143594|
|  Stuyvesant Heights|141427|
|        Clinton East|139611|
|West Chelsea/Huds...|139431|
|             Bedford|138428|
|         Murray Hill|137879|
+--------------------+------+
only showing top 20 rows



                                                                                