In [1]:
import pyspark
from pyspark.sql import SparkSession, types, functions as F

### Question 1: 

**Install Spark and PySpark** 

- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version.

What's the output?

In [2]:
pyspark.__version__

'3.5.1'

Ans: 3.5.1

### Question 2: 

**FHV October 2019**

Using the FHV 2019-10 data found [here](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz), read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons.

Repartition the Dataframe to 6 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

- 1MB
- 6MB
- 25MB
- 87MB

In [3]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-07 10:23:02--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240307%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240307T022303Z&X-Amz-Expires=300&X-Amz-Signature=518d8cf5ff15625350e3fbd451ff94c16d1aa140d5dfe116f9ef82a26a338d2e&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-07 10:23:03--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6

In [4]:
spark = SparkSession.builder.master("local[*]").appName("taxi").getOrCreate()

your 131072x1 screen size is bogus. expect trouble
24/03/07 10:23:15 WARN Utils: Your hostname, DESKTOP-83AGO4K resolves to a loopback address: 127.0.1.1; using 172.26.223.117 instead (on interface eth0)
24/03/07 10:23:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/07 10:23:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/07 10:23:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PUlocationID', types.IntegerType(), True),
    types.StructField('DOlocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [6]:
df = spark.read \
    .option("header", "true") \
    .option("ignoreLeadingWhiteSpace", "true") \
    .option("ignoreTrailingWhiteSpace", "true") \
    .schema(schema) \
    .csv("fhv_tripdata_2019-10.csv.gz", inferSchema=True)

In [7]:
df.show()

                                                                                

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|              B00021|2019-10-01 00:00:4

In [8]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [9]:
df.repartition(6).write.parquet("fhv/2019/10/")

                                                                                

In [10]:
!ls -l fhv/2019/10

total 39048
-rw-r--r-- 1 wongcheefah wongcheefah       0 Mar  7 10:24 _SUCCESS
-rw-r--r-- 1 wongcheefah wongcheefah 6670888 Mar  7 10:24 part-00000-5150ad4f-fd8e-44d5-951a-dae93430285e-c000.snappy.parquet
-rw-r--r-- 1 wongcheefah wongcheefah 6667249 Mar  7 10:24 part-00001-5150ad4f-fd8e-44d5-951a-dae93430285e-c000.snappy.parquet
-rw-r--r-- 1 wongcheefah wongcheefah 6654742 Mar  7 10:24 part-00002-5150ad4f-fd8e-44d5-951a-dae93430285e-c000.snappy.parquet
-rw-r--r-- 1 wongcheefah wongcheefah 6665760 Mar  7 10:24 part-00003-5150ad4f-fd8e-44d5-951a-dae93430285e-c000.snappy.parquet
-rw-r--r-- 1 wongcheefah wongcheefah 6663196 Mar  7 10:24 part-00004-5150ad4f-fd8e-44d5-951a-dae93430285e-c000.snappy.parquet
-rw-r--r-- 1 wongcheefah wongcheefah 6653765 Mar  7 10:24 part-00005-5150ad4f-fd8e-44d5-951a-dae93430285e-c000.snappy.parquet


Ans: 6MB

### Question 3: 

**Count records** 

How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of October.

- 108,164
- 12,856
- 452,470
- 62,610


In [11]:
df = df.withColumn("pickup_date", F.to_date(df.pickup_datetime))

In [12]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|pickup_date|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009| 2019-10-01|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013| 2019-10-01|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014| 2019-10-01|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014| 2019-10-01|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264| 

In [13]:
df.groupBy("pickup_date").count().where(F.col("pickup_date") == "2019-10-15").show()

[Stage 5:>                                                          (0 + 1) / 1]

+-----------+-----+
|pickup_date|count|
+-----------+-----+
| 2019-10-15|62610|
+-----------+-----+



                                                                                

Ans: 62,610

### Question 4: 

**Longest trip for each day** 

What is the length of the longest trip in the dataset in hours?

- 631,152.50 Hours
- 243.44 Hours
- 7.68 Hours
- 3.32 Hours

In [14]:
def trip_duration(pickup_datetime, dropOff_datetime):
    duration = abs(dropOff_datetime - pickup_datetime)
    return duration.total_seconds() / 3600.0

In [15]:
trip_duration_udf = F.udf(trip_duration, returnType=types.FloatType())

In [16]:
df = df.withColumn("trip_duration", trip_duration_udf(df.pickup_datetime, df.dropOff_datetime))

In [17]:
df.orderBy("trip_duration", ascending=False).show()

[Stage 8:>                                                          (0 + 1) / 1]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+-------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|pickup_date|trip_duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+-------------+
|              B02832|2019-10-11 18:00:00|2091-10-11 18:30:00|         264|         264|   NULL|                B02832| 2019-10-11|     631152.5|
|              B02832|2019-10-28 09:00:00|2091-10-28 09:30:00|         264|         264|   NULL|                B02832| 2019-10-28|     631152.5|
|              B02416|2019-10-31 23:46:33|2029-11-01 00:13:00|        NULL|        NULL|   NULL|                B02416| 2019-10-31|     87672.44|
|              B00746|2019-10-01 21:43:42|2027-10-01 21:45:23|         159|         264|   NULL|                B00746| 2019

                                                                                

In [18]:
df.agg(F.max("trip_duration")).show()

[Stage 9:>                                                          (0 + 1) / 1]

+------------------+
|max(trip_duration)|
+------------------+
|          631152.5|
+------------------+



                                                                                

Ans: 631,152.50 Hours

### Question 5: 

**User Interface**

Spark’s User Interface which shows the application's dashboard runs on which local port?

- 80
- 443
- 4040
- 8080

Ans: 4040

### Question 6: 

**Least frequent pickup location zone**

Load the zone lookup data into a temp view in Spark</br>
[Zone Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv)

Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?</br>

- East Chelsea
- Jamaica Bay
- Union Sq
- Crown Heights North

In [19]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-03-07 10:26:30--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240307%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240307T022630Z&X-Amz-Expires=300&X-Amz-Signature=9c502d3d7a2128ae74ef1938924d35a9b7cec0ade42121ae420176343bcc6df8&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-07 10:26:30--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62

In [20]:
df_zones = spark.read.option("header", "true").csv("taxi_zone_lookup.csv")

In [22]:
df_zones.createOrReplaceTempView("zones")

In [23]:
df.createOrReplaceTempView('fhv_tripdata')

In [24]:
spark.sql(
    """
    SELECT Zone, COUNT(Zone) AS zone_count
    FROM fhv_tripdata
    LEFT JOIN zones
    ON fhv_tripdata.PUlocationID = zones.LocationID
    GROUP BY Zone
    ORDER BY zone_count ASC
    """
).show()


[Stage 14:>                                                         (0 + 1) / 1]

+--------------------+----------+
|                Zone|zone_count|
+--------------------+----------+
|                NULL|         0|
|         Jamaica Bay|         1|
|Governor's Island...|         2|
| Green-Wood Cemetery|         5|
|       Broad Channel|         8|
|     Highbridge Park|        14|
|        Battery Park|        15|
|Saint Michaels Ce...|        23|
|Breezy Point/Fort...|        25|
|Marine Park/Floyd...|        26|
|        Astoria Park|        29|
|    Inwood Hill Park|        39|
|       Willets Point|        47|
|Forest Park/Highl...|        53|
|  Brooklyn Navy Yard|        57|
|        Crotona Park|        62|
|        Country Club|        77|
|     Freshkills Park|        89|
|       Prospect Park|        98|
|     Columbia Street|       105|
+--------------------+----------+
only showing top 20 rows



                                                                                

Ans: Jamaica Bay