In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkFiles
import findspark

In [2]:
findspark.init()

In [3]:
spark = (
    SparkSession.builder
            .master("local[*]")
            .appName("NYC Taxi Batch processing")
            .getOrCreate()
)

24/03/02 02:01:39 WARN Utils: Your hostname, sls-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.29.248 instead (on interface en0)
24/03/02 02:01:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/02 02:01:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Q1

What is the version of Spark in use?

In [4]:
spark.version

'3.5.1'

Q2

FHV October 2019

Read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons.

Repartition the Dataframe to 6 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

In [5]:
file_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2019-10.parquet'

In [6]:
spark.sparkContext.addFile(file_url)

In [7]:
schema = StructType(
            [
                StructField("dispatching_base_num", StringType()),
                StructField("pickup_datetime", TimestampType()),
                StructField("dropoff_datetime", TimestampType()),
                StructField("PULocationID", IntegerType()),
                StructField("DOLocationID", IntegerType()),
                StructField("SR_Flag", StringType()),
                StructField("Affiliated_base_number", StringType())
            ]
)

In [8]:
# to counter the following error
# Parquet column cannot be converted in file 
# file:///private/var/folders/tb/tknsg8m10jvc1t4fztzvf7kh0000gn/T/spark-ee38c745-46da-4d4a-9d36-0b8860405f64/userFiles-40720fdf-cdc1-40e1-859d-1da9e4f4cf59/fhv_tripdata_2019-10.parquet. Column: [PUlocationID], Expected: int, Found: DOUBLE
# below config doesn't fix the error
# spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")

In [9]:
df = (
    spark
        .read
        # .schema(schema)
        .parquet(SparkFiles.get(file_url.split("/")[-1]))
)

                                                                                

In [10]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropOff_datetime: timestamp_ntz (nullable = true)
 |-- PUlocationID: double (nullable = true)
 |-- DOlocationID: double (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [11]:
df.count()

1897856

In [12]:
df = df.repartition(6)

In [13]:
(
    df.write
        .mode("overwrite")
        .parquet("../data/fhv/2019/10")
)

                                                                                

In [14]:
!ls -lh "../data/fhv/2019/10"

total 74496
-rw-r--r--  1 home  staff     0B Mar  2 02:01 _SUCCESS
-rw-r--r--  1 home  staff   5.7M Mar  2 02:01 part-00000-bfff2089-876d-4ae8-8585-5b21bbbd01c5-c000.snappy.parquet
-rw-r--r--  1 home  staff   5.6M Mar  2 02:01 part-00001-bfff2089-876d-4ae8-8585-5b21bbbd01c5-c000.snappy.parquet
-rw-r--r--  1 home  staff   5.7M Mar  2 02:01 part-00002-bfff2089-876d-4ae8-8585-5b21bbbd01c5-c000.snappy.parquet
-rw-r--r--  1 home  staff   5.7M Mar  2 02:01 part-00003-bfff2089-876d-4ae8-8585-5b21bbbd01c5-c000.snappy.parquet
-rw-r--r--  1 home  staff   5.7M Mar  2 02:01 part-00004-bfff2089-876d-4ae8-8585-5b21bbbd01c5-c000.snappy.parquet
-rw-r--r--  1 home  staff   5.7M Mar  2 02:01 part-00005-bfff2089-876d-4ae8-8585-5b21bbbd01c5-c000.snappy.parquet


Q3

Count records

How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of October.

In [15]:
(
    df
    .filter(col("pickup_datetime").cast(DateType()) == '2019-10-15')
    .count()
)

62629

Q4

Longest trip for each day

What is the length of the longest trip in the dataset in hours?

In [16]:
(
    df
    .select(col("dropoff_datetime"), col("pickup_datetime"))
    .withColumn("duration", (col("dropoff_datetime") - col("pickup_datetime")).cast(LongType())/3600)
    .withColumn("pickup_date", to_date(col("pickup_datetime")))
    .groupBy("pickup_date")
    .max("duration")
    .withColumnRenamed("max(duration)", "max_duration_hrs")
    .orderBy('max_duration_hrs', ascending=False) \
    .limit(1)
    .show()
)

                                                                                

+-----------+----------------+
|pickup_date|max_duration_hrs|
+-----------+----------------+
| 2019-10-28|        631152.5|
+-----------+----------------+



Q5

http://localhost:4040

Q6:
Least frequent pickup location zone

Load the zone lookup data into a temp view in Spark
Zone Data

Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?



In [17]:
df_zones = spark.read.csv('../../input_data/taxi_zone_lookup.csv', header="true")

In [18]:
df.createOrReplaceTempView('fhv_2019_10')
df_zones.createOrReplaceTempView('taxi_trip_zones')

In [19]:
spark.sql(
    """
    select
     zone,
     count(*) num_pickups
    from
     fhv_2019_10 a left join taxi_trip_zones b on a.PULocationID = cast(b.LocationID as double)
    group by
     zone
    order by
     num_pickups asc
    """
).show(3)
     

+--------------------+-----------+
|                zone|num_pickups|
+--------------------+-----------+
|         Jamaica Bay|          1|
|Governor's Island...|          2|
| Green-Wood Cemetery|          5|
+--------------------+-----------+
only showing top 3 rows

