In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/02/29 19:55:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/29 19:55:40 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark.version

'3.3.2'

In [4]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-02-29 19:59:13--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.114.4
Connecting to github.com (github.com)|140.82.114.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240229%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240229T195913Z&X-Amz-Expires=300&X-Amz-Signature=168500d3783d3a390f4b094b4393864df281cd926fd3715c8ac59e5fa6c9fcbf&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-02-29 19:59:13--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-

In [6]:
!ls -lh fhv_tripdata_2019-10.csv.gz

-rw-rw-r-- 1 Rodolfo Rodolfo 19M Dec  2  2022 fhv_tripdata_2019-10.csv.gz


In [7]:
fhv_schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropOff_datetime", types.TimestampType(), True),
    types.StructField("PUlocationID", types.IntegerType(), True),
    types.StructField("DOlocationID", types.IntegerType(), True),
    types.StructField("SR_Flag", types.IntegerType(), True),
    types.StructField("Affiliated_base_number", types.StringType(), True)
])

In [8]:
df = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv('fhv_tripdata_2019-10.csv.gz')

df.repartition(6).write.parquet('data/pq/fhvhv/2019/10/')

                                                                                

In [9]:
!ls -lh data/pq/fhvhv/2019/10/

total 39M
-rw-r--r-- 1 Rodolfo Rodolfo    0 Feb 29 20:03 _SUCCESS
-rw-r--r-- 1 Rodolfo Rodolfo 6.4M Feb 29 20:03 part-00000-3b1ed104-cc03-4306-bb0a-973c3530404a-c000.snappy.parquet
-rw-r--r-- 1 Rodolfo Rodolfo 6.4M Feb 29 20:03 part-00001-3b1ed104-cc03-4306-bb0a-973c3530404a-c000.snappy.parquet
-rw-r--r-- 1 Rodolfo Rodolfo 6.4M Feb 29 20:03 part-00002-3b1ed104-cc03-4306-bb0a-973c3530404a-c000.snappy.parquet
-rw-r--r-- 1 Rodolfo Rodolfo 6.4M Feb 29 20:03 part-00003-3b1ed104-cc03-4306-bb0a-973c3530404a-c000.snappy.parquet
-rw-r--r-- 1 Rodolfo Rodolfo 6.4M Feb 29 20:03 part-00004-3b1ed104-cc03-4306-bb0a-973c3530404a-c000.snappy.parquet
-rw-r--r-- 1 Rodolfo Rodolfo 6.4M Feb 29 20:03 part-00005-3b1ed104-cc03-4306-bb0a-973c3530404a-c000.snappy.parquet


In [10]:
df = spark.read.parquet('data/pq/fhvhv/2019/10/')

**Q3**: How many taxi trips were there on the 15th of October?

In [11]:
from pyspark.sql import functions as F

In [12]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2019-10-15'") \
    .count()

                                                                                

62610

In [13]:
df.registerTempTable('fhvhv_2019_10')



In [14]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    fhvhv_2019_10
WHERE
    to_date(pickup_datetime) = '2019-10-15';
""").show()

+--------+
|count(1)|
+--------+
|   62610|
+--------+



**Q4**: Longest trip for each day

In [29]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropOff_datetime',
 'PUlocationID',
 'DOlocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [37]:
spark.sql("""
SELECT
    pickup_datetime,
    dropOff_datetime,
    (unix_timestamp(dropOff_datetime) - unix_timestamp(pickup_datetime)) / 3600 as trip_duration_hours
FROM 
    fhvhv_2019_10
ORDER BY
    3 DESC
LIMIT 10;
""").show()



+-------------------+-------------------+-------------------+
|    pickup_datetime|   dropOff_datetime|trip_duration_hours|
+-------------------+-------------------+-------------------+
|2019-10-11 18:00:00|2091-10-11 18:30:00|           631152.5|
|2019-10-28 09:00:00|2091-10-28 09:30:00|           631152.5|
|2019-10-31 23:46:33|2029-11-01 00:13:00|  87672.44083333333|
|2019-10-01 21:43:42|2027-10-01 21:45:23|  70128.02805555555|
|2019-10-17 14:00:00|2020-10-18 00:00:00|             8794.0|
|2019-10-26 21:26:00|2020-10-26 21:36:00|  8784.166666666666|
|2019-10-30 12:30:04|2019-12-30 13:02:08| 1464.5344444444445|
|2019-10-25 07:04:57|2019-12-08 07:54:33| 1056.8266666666666|
|2019-10-25 07:04:57|2019-12-08 07:21:11| 1056.2705555555556|
|2019-10-01 13:47:17|2019-11-03 15:20:28|  793.5530555555556|
+-------------------+-------------------+-------------------+



                                                                                

**Q5**: User Interface

Spark’s User Interface which shows the application's dashboard runs on which local port?



- Answer: 4040

**Q6**: Least frequent pickup location zone Most common locations pair

In [21]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-02-29 22:44:12--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.113.4
Connecting to github.com (github.com)|140.82.113.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240229%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240229T224412Z&X-Amz-Expires=300&X-Amz-Signature=36f2f7a8ecd73f95ce2f32cb19ff0f2c5f83bb58957718e3bf774606b721db08&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-02-29 22:44:12--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [24]:
df_zones = spark.read.option("header", "true").csv('taxi_zone_lookup.csv')

In [25]:
df_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [26]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropOff_datetime',
 'PUlocationID',
 'DOlocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [27]:
df_zones.registerTempTable('zones')

In [28]:
spark.sql("""
SELECT
    pul.Zone AS pu_zone,
    COUNT(1)
FROM 
    fhvhv_2019_10 fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID
                      LEFT JOIN zones dol ON fhv.DOLocationID = dol.LocationID
GROUP BY 
    1
ORDER BY
    2 ASC
LIMIT 5;
""").show()



+--------------------+--------+
|             pu_zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
+--------------------+--------+



                                                                                

- Answer: Jamaica Bay